Advertisement
Guest User

Untitled

a guest
Jan 12th, 2020
443
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.38 KB | None | 0 0
  1. import hmac
  2. import json
  3. import time
  4. import zlib
  5. from collections import defaultdict, deque
  6. from itertools import zip_longest
  7. from typing import DefaultDict, Deque, List, Dict, Tuple, Optional
  8. from gevent.event import Event
  9.  
  10. from websocket.websocket_manager import WebsocketManager
  11.  
  12.  
  13. class FtxWebsocketClient(WebsocketManager):
  14.     _ENDPOINT = 'wss://ftx.com/ws/'
  15.  
  16.     def __init__(self) -> None:
  17.         super().__init__()
  18.         self._trades: DefaultDict[str, Deque] = defaultdict(lambda: deque([], maxlen=10000))
  19.         self._fills: Deque = deque([], maxlen=10000)
  20.         self._api_key = ''  # TODO: Place your API key here
  21.         self._api_secret = ''  # TODO: Place your API secret here
  22.         self._orderbook_update_events: DefaultDict[str, Event] = defaultdict(Event)
  23.         self._reset_data()
  24.  
  25.     def _on_open(self, ws):
  26.         self._reset_data()
  27.  
  28.     def _reset_data(self) -> None:
  29.         self._subscriptions: List[Dict] = []
  30.         self._orders: DefaultDict[int, Dict] = defaultdict(dict)
  31.         self._tickers: DefaultDict[str, Dict] = defaultdict(dict)
  32.         self._orderbook_timestamps: DefaultDict[str, float] = defaultdict(float)
  33.         self._orderbook_update_events.clear()
  34.         self._orderbooks: DefaultDict[str, Dict[str, DefaultDict[float, float]]] = defaultdict(
  35.             lambda: {side: defaultdict(float) for side in {'bids', 'asks'}})
  36.         self._orderbook_timestamps.clear()
  37.         self._logged_in = False
  38.         self._last_received_orderbook_data_at: float = 0.0
  39.  
  40.     def _reset_orderbook(self, market: str) -> None:
  41.         if market in self._orderbooks:
  42.             del self._orderbooks[market]
  43.         if market in self._orderbook_timestamps:
  44.             del self._orderbook_timestamps[market]
  45.  
  46.     def _get_url(self) -> str:
  47.         return self._ENDPOINT
  48.  
  49.     def _login(self) -> None:
  50.         ts = int(time.time() * 1000)
  51.         self.send_json({'op': 'login', 'args': {
  52.             'key': self._api_key,
  53.             'sign': hmac.new(
  54.                 self._api_secret.encode(), f'{ts}websocket_login'.encode(), 'sha256').hexdigest(),
  55.             'time': ts,
  56.         }})
  57.         self._logged_in = True
  58.  
  59.     def _subscribe(self, subscription: Dict) -> None:
  60.         self.send_json({'op': 'subscribe', **subscription})
  61.         self._subscriptions.append(subscription)
  62.  
  63.     def _unsubscribe(self, subscription: Dict) -> None:
  64.         self.send_json({'op': 'unsubscribe', **subscription})
  65.         while subscription in self._subscriptions:
  66.             self._subscriptions.remove(subscription)
  67.  
  68.     def get_fills(self) -> List[Dict]:
  69.         if not self._logged_in:
  70.             self._login()
  71.         subscription = {'channel': 'fills'}
  72.         if subscription not in self._subscriptions:
  73.             self._subscribe(subscription)
  74.         return list(self._fills.copy())
  75.  
  76.     def get_orders(self) -> Dict[int, Dict]:
  77.         if not self._logged_in:
  78.             self._login()
  79.         subscription = {'channel': 'orders'}
  80.         if subscription not in self._subscriptions:
  81.             self._subscribe(subscription)
  82.         return dict(self._orders.copy())
  83.  
  84.     def get_trades(self, market: str) -> List[Dict]:
  85.         subscription = {'channel': 'trades', 'market': market}
  86.         if subscription not in self._subscriptions:
  87.             self._subscribe(subscription)
  88.         return list(self._trades[market].copy())
  89.  
  90.     def get_orderbook(self, market: str) -> Dict[str, List[Tuple[float, float]]]:
  91.         subscription = {'channel': 'orderbook', 'market': market}
  92.         if subscription not in self._subscriptions:
  93.             self._subscribe(subscription)
  94.         if self._orderbook_timestamps[market] == 0:
  95.             self.wait_for_orderbook_update(market, 5)
  96.         return {
  97.             side: sorted(
  98.                 [(price, quantity) for price, quantity in list(self._orderbooks[market][side].items())
  99.                  if quantity],
  100.                 key=lambda order: order[0] * (-1 if side == 'bids' else 1)
  101.             )
  102.             for side in {'bids', 'asks'}
  103.         }
  104.  
  105.     def get_orderbook_timestamp(self, market: str) -> float:
  106.         return self._orderbook_timestamps[market]
  107.  
  108.     def wait_for_orderbook_update(self, market: str, timeout: Optional[float]) -> None:
  109.         subscription = {'channel': 'orderbook', 'market': market}
  110.         if subscription not in self._subscriptions:
  111.             self._subscribe(subscription)
  112.         self._orderbook_update_events[market].wait(timeout)
  113.  
  114.     def get_ticker(self, market: str) -> Dict:
  115.         subscription = {'channel': 'ticker', 'market': market}
  116.         if subscription not in self._subscriptions:
  117.             self._subscribe(subscription)
  118.         return self._tickers[market]
  119.  
  120.     def _handle_orderbook_message(self, message: Dict) -> None:
  121.         market = message['market']
  122.         data = message['data']
  123.         if data['action'] == 'partial':
  124.             self._reset_orderbook(market)
  125.         for side in {'bids', 'asks'}:
  126.             book = self._orderbooks[market][side]
  127.             for price, size in data[side]:
  128.                 if size:
  129.                     book[price] = size
  130.                 else:
  131.                     del book[price]
  132.             self._orderbook_timestamps[market] = data['time']
  133.         checksum = data['checksum']
  134.         orderbook = self.get_orderbook(market)
  135.         checksum_data = [
  136.             ':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order])
  137.             for (bid, offer) in zip_longest(orderbook['bids'][:100], orderbook['asks'][:100])
  138.         ]
  139.  
  140.         computed_result = int(zlib.crc32(':'.join(checksum_data).encode()))
  141.         if computed_result != checksum:
  142.             self._last_received_orderbook_data_at = 0
  143.             self._reset_orderbook(market)
  144.             self._unsubscribe({'market': market, 'channel': 'orderbook'})
  145.             self._subscribe({'market': market, 'channel': 'orderbook'})
  146.         else:
  147.             self._orderbook_update_events[market].set()
  148.             self._orderbook_update_events[market].clear()
  149.  
  150.     def _handle_trades_message(self, message: Dict) -> None:
  151.         self._trades[message['market']].append(message['data'])
  152.  
  153.     def _handle_ticker_message(self, message: Dict) -> None:
  154.         self._tickers[message['market']] = message['data']
  155.  
  156.     def _handle_fills_message(self, message: Dict) -> None:
  157.         self._fills.append(message['data'])
  158.  
  159.     def _handle_orders_message(self, message: Dict) -> None:
  160.         data = message['data']
  161.         self._orders.update({data['id']: data})
  162.  
  163.     def _on_message(self, ws, raw_message: str) -> None:
  164.         message = json.loads(raw_message)
  165.         message_type = message['type']
  166.         if message_type in {'subscribed', 'unsubscribed'}:
  167.             return
  168.         elif message_type == 'info':
  169.             if message['code'] == 20001:
  170.                 return self.reconnect()
  171.         elif message_type == 'error':
  172.             raise Exception(message)
  173.         channel = message['channel']
  174.  
  175.         if channel == 'orderbook':
  176.             self._handle_orderbook_message(message)
  177.         elif channel == 'trades':
  178.             self._handle_trades_message(message)
  179.         elif channel == 'ticker':
  180.             self._handle_ticker_message(message)
  181.         elif channel == 'fills':
  182.             self._handle_fills_message(message)
  183.         elif channel == 'orders':
  184.             self._handle_orders_message(message)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement