Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Метод класса HTTPCLIENT
- async def get_snapshot(self, currency_symbol, semaphore):
- http_route = "https://www.binance.com/api/v1/depth?symbol={currency_symbol}&limit=20"
- while True:
- try:
- async with ClientSession() as session:
- async with semaphore, session.get(http_route.format(currency_symbol=currency_symbol.upper()),
- proxy=proxy.get_proxy()) as response:
- response = await response.json()
- rest_log.info('Depth snapshot for {} got. Response {}'.format(currency_symbol.upper(), response))
- return response
- except:
- type, value, traceback_info = sys.exc_info()
- rest_log.error('Error {type} {value} in get depth snapshot for {cur}'.format(
- type=type, value=value, cur=currency_symbol.upper()))
- continue
- # Метод класса DepthSnapshot
- async def get(self, semaphore):
- """Try to make a http request for MAX_STEP attempts to depth snapshot
- :return dictionary {'currency': response}
- """
- try:
- currency_id = currency_iter.currency_id(self.currency_symbol)
- response = await http_client.get_snapshot(self.currency_symbol, semaphore)
- if 'code' in response:
- data = {currency_id: {'lastUpdateId': 0, 'bids': {}, 'asks': {}}}
- depth.init_with_snapshot(data)
- worker_log.info('Empty response from snapshot for currency {}'.
- format(currency_iter.currency_symbol(currency_id)))
- return {currency_id: []}
- else:
- data = {currency_id: response}
- depth.init_with_snapshot(data)
- return {currency_id: response}
- except:
- type, value, traceback_info = sys.exc_info()
- worker_log.error('Error {type} {value} in depth snapshot for {cur}'.format(
- type=type, value=value, cur=self.currency_symbol.upper()))
- # Cам воркер
- async def depth_worker(self):
- depth_websocket_route = "wss://stream.binance.com:9443/stream?streams="
- try:
- for currency in currency_iter.cur_list:
- depth_websocket_route += '{currency}@depth/'.format(currency=currency)
- async with websockets.connect(depth_websocket_route.lower()) as websocket:
- depth_semaphore = asyncio.Semaphore(10)
- #try:
- depth.set_init_values()
- worker_log.info('Start getting depth snapshot')
- await asyncio.gather(
- *[DepthSnapshot(currency).get(depth_semaphore) for currency in currency_iter.cur_list])
- worker_log.info('Depth snapshot got')
- # except:
- # traceback.print_exc()
- while True:
- payload = await websocket.recv()
- parsed_payload = json.loads(payload)
- try:
- depth.add(parsed_payload)
- except:
- traceback.print_exc()
- except Exception as e:
- type, value, traceback_info = sys.exc_info()
- worker_log.error('Error {type} {value} in depth worker'.format(
- type=type, value=value))
- raise DepthWorkerException(str(e))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement