Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class BybitWS:
- PRIVATE_TOPIC = ["position", "execution", "order", "stop_order"]
- def __init__(self, ws_url, api_key, api_secret):
- self.api_key = api_key
- self.api_secret = api_secret
- self.auth = False
- self.data = {}
- self.exited = False
- self.__connect(ws_url)
- def exit(self):
- self.exited = True
- self.ws.close()
- def __connect(self, ws_url):
- self.ws = websocket.WebSocketApp(ws_url,
- on_message=self.__on_message,
- on_close=self.__on_close,
- on_open=self.__on_open,
- on_error=self.__on_error,
- keep_running=True)
- self.wst = threading.Thread(target=lambda: self.ws.run_forever())
- self.wst.daemon = True
- self.wst.start()
- retry_times = 5
- while not self.ws.sock or not self.ws.sock.connected and retry_times:
- sleep(1)
- retry_times -= 1
- if retry_times == 0 and not self.ws.sock.connected:
- self.exit()
- if self.api_key and self.api_secret:
- self.__auth()
- def signature(self, expires):
- constructed_params = str("GET/realtime") + str(expires)
- hash = hmac.new(bytes(self.api_secret, "utf-8"), bytes(constructed_params, "utf-8"), hashlib.sha256)
- return hash.hexdigest()
- def __auth(self):
- expires = str(int(round(time.time())+1))+"000"
- signature = self.signature(expires)
- auth = {}
- auth["op"] = "auth"
- auth["args"] = [self.api_key, expires, signature]
- args = json.dumps(auth)
- self.ws.send(args)
- def __on_message(self, message):
- message = json.loads(message)
- if "success" in message and message ["success"]:
- if "request" in message and message["request"]["op"] == "auth":
- self.auth = True
- if "ret_msg" in message and message["ret_msg"] == "pong":
- self.data["pong"].append("PING successful")
- if "topic" in message:
- self.data[message["topic"]].append(message["data"])
- def __on_error(self, error):
- if not self.exited:
- raise websocket.WebSocketException(error)
- def __on_open(self):
- pass
- def __on_close(self):
- pass
- def ping(self):
- message = {"op": "ping"}
- self.ws.send(json.dumps(message))
- def subscribe_kline(self, symbol: str, interval: str):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["klineV2." + interval + "." + symbol]}))
- if "kline." + symbol + "." + interval not in self.data:
- self.data["kline." + symbol + "." + interval] = []
- def subscribe_trade(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["trade"]}))
- if "trade.BTCUSD" not in self.data:
- self.data["trade.BTCUSD"] = []
- self.data["trade.ETHUSD"] = []
- self.data["trade.EOSUSD"] = []
- self.data["trade.XRPUSD"] = []
- def subscribe_insurance(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["insurance"]}))
- if "insurance.BTC" not in self.data:
- self.data["insurance.BTC"] = []
- self.data["insurance.XRP"] = []
- self.data["insurance.EOS"] = []
- self.data["insurance.ETH"] = []
- def subscribe_orderBookL2_25(self, symbol):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["orderBookL2_25." + symbol]}))
- if "orderBookL2_25." + symbol not in self.data:
- self.data["orderBookL2_25." + symbol] = []
- def subscribe_orderBookL2_200(self, symbol):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["orderBook_200.100ms" + symbol]}))
- if "orderBook_200.100ms" + symbol not in self.data:
- self.data["orderBook_200.100ms" + symbol] = []
- def subscribe_instrument_info(self, symbol):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["instrument_info.100ms." + symbol]}))
- if "instrument_info.100ms." + symbol not in self.data:
- self.data["instrument_info.100ms." + symbol] = []
- def subscribe_position(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["position"]}))
- if "position" not in self.data:
- self.data["position"] = []
- def subscribe_execution(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["execution"]}))
- if "execution" not in self.data:
- self.data["execution"] = []
- def subscribe_order(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["order"]}))
- if "order" not in self.data:
- self.data["order"] = []
- def subscribe_stop_order(self):
- self.ws.send(json.dumps({"op": "subscribe", "args": ["stop_order"]}))
- if "stop_order" not in self.data:
- self.data["stop_order"] = []
- def get_data(self, topic):
- if topic not in self.data:
- return []
- if topic.split(".")[0] in BybitWS.PRIVATE_TOPIC and not self.auth:
- return []
- else:
- if len(self.data[topic]) == 0:
- return []
- return self.data[topic].pop()
- BybitWS = Bybit.BybitWS(ws_base_url, api_key, api_secret)
- BybitWS.subscribe_instrument_info("BTCUSD")
- instrument_response = BybitWS.get_data("instrument_info.100ms.BTCUSD")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement