Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- namespace BinanceWebSockets {
- class BinanceWebSocket {
- public ClientWebSocket ws;
- public CancellationTokenSource ct;
- public string url;
- public string endpoint;
- public bool isRunning;
- private int miliseconds_for_reconnect = 5000;
- private ILogger logger;
- private const int init_buffer_size = 2048;
- private byte[] buffer;
- private int free_space;
- private int offset;
- public BinanceWebSocket(string _url,string _endpoint,ILogger l) {
- logger = l;
- url = _url;
- endpoint = _endpoint;
- ws = new ClientWebSocket();
- ct = new CancellationTokenSource();
- buffer = new byte[init_buffer_size];
- free_space = buffer.Length;
- offset = 0;
- isRunning = true;
- }
- public async Task<int> connect_async() {
- var uri_to_connect = new Uri(url+endpoint);
- int res = 0;
- var t = ws.ConnectAsync(uri_to_connect,ct.Token);
- while (true) {
- try {
- await Task.Delay(1000,ct.Token);
- } catch (TaskCanceledException) {
- }
- if (ws.State==WebSocketState.Open) {
- break;
- }
- else if (ws.State==WebSocketState.Aborted | ws.State==WebSocketState.Closed | ws.State==WebSocketState.CloseReceived | ws.State==WebSocketState.CloseSent) {
- res = 1;
- break;
- }
- }
- return res;
- }
- public void subscribe(string msg) {
- throw new NotImplementedException();
- }
- public async Task reconnect_async(int code, string reason) {
- while (true) {
- ws = new ClientWebSocket();
- logger.log_error($"{url+endpoint} connection problem: {reason} trying to reconnect in {miliseconds_for_reconnect/1000} seconds");
- try {
- await Task.Delay(miliseconds_for_reconnect,ct.Token);
- } catch (TaskCanceledException) {
- }
- var s = await connect_async();
- if (s==0) {logger.log_info("connected");break;} else {miliseconds_for_reconnect = miliseconds_for_reconnect+miliseconds_for_reconnect;}
- }
- }
- public async Task<int> close_async() {
- ct.Cancel();
- isRunning = false;
- var r = await disconnect_async();
- ws.Dispose();
- return r;
- }
- public async Task<int> disconnect_async() {
- if (ws.State==WebSocketState.Open) {
- await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure,"",new CancellationToken());
- }
- if (ws.State==WebSocketState.Closed) {
- return 0;
- } else {
- return 1;
- }
- }
- public void Dispose() {
- }
- public async Task<string> recieve_async() {
- while (isRunning) {
- try {
- var res = await ws.ReceiveAsync(new ArraySegment<byte>(buffer,offset,free_space),ct.Token);
- free_space -= res.Count;
- offset += res.Count;
- if (res.EndOfMessage) {
- var strmsg = System.Text.Encoding.UTF8.GetString(buffer).TrimEnd('\0');
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- return strmsg;
- }
- if (free_space==0) {
- var new_size = buffer.Length+init_buffer_size;
- Array.Resize(ref buffer, new_size);
- free_space = buffer.Length-offset;
- }
- if (res.MessageType==WebSocketMessageType.Close && isRunning) {
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- await reconnect_async(1,"Server disconnected");
- }
- if (res.Count==0) {
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- await reconnect_async(1,"Server disconnected");
- }
- }
- catch (System.Net.WebSockets.WebSocketException e) {
- if (isRunning) {
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- await reconnect_async(2,e.Message);
- }
- }
- catch (TaskCanceledException) {
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- return "";
- }
- }
- offset = 0;
- free_space = buffer.Length;
- Array.Clear(buffer,0,buffer.Length);
- return "";
- }
- public virtual Task start()
- {
- throw new NotImplementedException();
- }
- }
- class UserStream:BinanceWebSocket {
- public event UserStreamCallback callback;
- private IRequestClient rq;
- // private string last_eror_id;
- private bool listen_key_no_found = false;
- public UserStream(string u, string _endpoint, ILogger l, UserStreamCallback cl, IRequestClient _rq):base(u,_endpoint,l) {
- rq = _rq;
- callback += cl;
- // last_eror_id = "";
- }
- public new void Dispose() {
- rq.restart_user_stream_event_unsubscribe(restart_user_stream);
- }
- public void restart_user_stream() {
- listen_key_no_found = true;
- ct.Cancel();
- }
- private async Task run_async() {
- while (isRunning) {
- var strmsg = await recieve_async();
- if (strmsg!="") {
- var d = JsonSerializer.Deserialize<dynamic>(strmsg);
- if (d.ContainsKey("e")) {
- if (d["e"]==GeneralEnums.Order_update) {
- var ous = new OrderUpdateStruct(d["o"]["s"],d["o"]["S"],d["o"]["o"],d["o"]["f"],decimal.Parse(d["o"]["q"],CultureInfo.InvariantCulture),
- decimal.Parse(d["o"]["p"],CultureInfo.InvariantCulture),decimal.Parse(d["o"]["sp"],CultureInfo.InvariantCulture),
- decimal.Parse(d["o"]["ap"],CultureInfo.InvariantCulture),decimal.Parse(d["o"]["l"],CultureInfo.InvariantCulture),
- decimal.Parse(d["o"]["L"],CultureInfo.InvariantCulture),d["o"]["X"],d["o"]["c"]);
- var uss = new UserStreamStruct(d["o"]["s"],GeneralEnums.Order_update,ous,new PositionChangeStruct());
- callback?.Invoke(uss);
- } else if (d["e"]==GeneralEnums.Position_update) {
- var pcs = new PositionChangeStruct(d["a"]["P"][0]["s"],decimal.Parse(d["a"]["P"][0]["ep"],CultureInfo.InvariantCulture),
- decimal.Parse(d["a"]["P"][0]["pa"],CultureInfo.InvariantCulture),decimal.Parse(d["a"]["P"][0]["up"],CultureInfo.InvariantCulture));
- var uss = new UserStreamStruct(d["a"]["P"][0]["s"],GeneralEnums.Position_update,new OrderUpdateStruct(),pcs);
- callback?.Invoke(uss);
- } else if (d["e"]=="listenKeyExpired") {
- // if (d["E"]!=last_eror_id) {
- await restart_user_stream_async();
- // }
- } else {
- rq.logger.log_system_info($"User data stream listen key {base.endpoint} message: {strmsg}");
- }
- } else {
- rq.logger.log_system_info($"User data stream listen key {base.endpoint} message: {strmsg}");
- }
- } else {
- if (listen_key_no_found) {
- listen_key_no_found = false;
- ct = new CancellationTokenSource();
- await restart_user_stream_async();
- }
- }
- }
- }
- private async Task restart_user_stream_async() {
- rq.logger.log_error("listen key expired reconnect");
- await base.disconnect_async();
- var lt = this.rq.listen_key_get();
- this.rq.listen_key = lt;
- base.endpoint = this.rq.listen_key;
- await reconnect_async(3,"listen key expired");
- }
- public override Task start() {
- return run_async();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement