Advertisement
Guest User

Untitled

a guest
Apr 3rd, 2023
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. namespace BinanceWebSockets {
  2.         class BinanceWebSocket {
  3.             public ClientWebSocket ws;
  4.             public CancellationTokenSource ct;
  5.             public string url;
  6.             public string endpoint;
  7.             public bool isRunning;
  8.             private int miliseconds_for_reconnect = 5000;
  9.             private ILogger logger;
  10.             private const int init_buffer_size = 2048;
  11.             private byte[] buffer;
  12.             private int free_space;
  13.             private int offset;
  14.  
  15.             public BinanceWebSocket(string _url,string _endpoint,ILogger l) {
  16.                 logger = l;
  17.                 url = _url;
  18.                 endpoint = _endpoint;
  19.                 ws = new ClientWebSocket();
  20.                 ct = new CancellationTokenSource();
  21.  
  22.                 buffer = new byte[init_buffer_size];
  23.                 free_space = buffer.Length;
  24.                 offset = 0;
  25.  
  26.                 isRunning = true;
  27.  
  28.             }
  29.  
  30.             public async Task<int> connect_async() {
  31.                 var uri_to_connect = new Uri(url+endpoint);
  32.                 int res = 0;
  33.                 var t = ws.ConnectAsync(uri_to_connect,ct.Token);
  34.  
  35.                 while (true) {
  36.                     try {
  37.                         await Task.Delay(1000,ct.Token);
  38.  
  39.                     } catch (TaskCanceledException) {
  40.  
  41.                     }
  42.  
  43.                     if (ws.State==WebSocketState.Open) {
  44.                         break;
  45.                     }
  46.  
  47.                     else if (ws.State==WebSocketState.Aborted | ws.State==WebSocketState.Closed | ws.State==WebSocketState.CloseReceived | ws.State==WebSocketState.CloseSent) {                        
  48.                         res = 1;
  49.                         break;
  50.                     }
  51.                 }
  52.  
  53.                 return res;
  54.                
  55.             }
  56.  
  57.             public void subscribe(string msg) {
  58.                 throw new NotImplementedException();
  59.             }
  60.  
  61.             public async Task reconnect_async(int code, string reason) {
  62.                 while (true) {
  63.                     ws = new ClientWebSocket();
  64.  
  65.                     logger.log_error($"{url+endpoint} connection problem: {reason} trying to reconnect in {miliseconds_for_reconnect/1000} seconds");
  66.  
  67.                     try {
  68.                         await Task.Delay(miliseconds_for_reconnect,ct.Token);
  69.  
  70.                     } catch (TaskCanceledException) {
  71.  
  72.                     }
  73.  
  74.                     var s = await connect_async();
  75.                     if (s==0) {logger.log_info("connected");break;} else {miliseconds_for_reconnect = miliseconds_for_reconnect+miliseconds_for_reconnect;}
  76.  
  77.                 }
  78.                
  79.             }
  80.             public async Task<int> close_async() {
  81.                 ct.Cancel();
  82.  
  83.                 isRunning = false;
  84.                 var r = await disconnect_async();
  85.                 ws.Dispose();
  86.                 return r;
  87.  
  88.             }
  89.             public async Task<int> disconnect_async() {                
  90.                 if (ws.State==WebSocketState.Open) {
  91.                     await ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure,"",new CancellationToken());
  92.                 }
  93.  
  94.                 if (ws.State==WebSocketState.Closed) {
  95.                     return 0;
  96.                 } else {
  97.                     return 1;
  98.                 }
  99.  
  100.             }
  101.             public void Dispose() {
  102.  
  103.             }
  104.  
  105.             public async Task<string> recieve_async() {
  106.                 while (isRunning) {
  107.                     try {
  108.                         var res = await ws.ReceiveAsync(new ArraySegment<byte>(buffer,offset,free_space),ct.Token);
  109.                         free_space -= res.Count;
  110.                         offset += res.Count;
  111.                    
  112.                         if (res.EndOfMessage) {
  113.                             var strmsg = System.Text.Encoding.UTF8.GetString(buffer).TrimEnd('\0');
  114.  
  115.                             offset = 0;
  116.                             free_space = buffer.Length;
  117.                             Array.Clear(buffer,0,buffer.Length);
  118.  
  119.                             return strmsg;
  120.  
  121.                         }
  122.                         if (free_space==0) {
  123.                             var new_size = buffer.Length+init_buffer_size;
  124.                             Array.Resize(ref buffer, new_size);
  125.                             free_space = buffer.Length-offset;
  126.                         }
  127.  
  128.                         if (res.MessageType==WebSocketMessageType.Close && isRunning) {
  129.                             offset = 0;
  130.                             free_space = buffer.Length;
  131.                             Array.Clear(buffer,0,buffer.Length);
  132.                             await reconnect_async(1,"Server disconnected");
  133.                            
  134.                         }
  135.  
  136.                         if (res.Count==0) {
  137.                             offset = 0;
  138.                             free_space = buffer.Length;
  139.                             Array.Clear(buffer,0,buffer.Length);
  140.                             await reconnect_async(1,"Server disconnected");
  141.  
  142.                         }
  143.  
  144.                     }
  145.                     catch (System.Net.WebSockets.WebSocketException e) {
  146.                         if (isRunning) {
  147.                             offset = 0;
  148.                             free_space = buffer.Length;
  149.                             Array.Clear(buffer,0,buffer.Length);
  150.                             await reconnect_async(2,e.Message);
  151.                         }
  152.  
  153.                     }
  154.                     catch (TaskCanceledException) {
  155.                         offset = 0;
  156.                         free_space = buffer.Length;
  157.                         Array.Clear(buffer,0,buffer.Length);
  158.                        
  159.                         return "";
  160.  
  161.                     }
  162.  
  163.                 }    
  164.                
  165.                 offset = 0;
  166.                 free_space = buffer.Length;
  167.                 Array.Clear(buffer,0,buffer.Length);
  168.  
  169.                 return "";      
  170.  
  171.             }
  172.  
  173.             public virtual Task start()
  174.             {
  175.                 throw new NotImplementedException();
  176.             }
  177.         }
  178.        
  179.     class UserStream:BinanceWebSocket {
  180.         public event UserStreamCallback callback;
  181.         private IRequestClient rq;
  182.         // private string last_eror_id;
  183.         private bool listen_key_no_found = false;
  184.         public UserStream(string u, string _endpoint, ILogger l, UserStreamCallback cl, IRequestClient _rq):base(u,_endpoint,l) {
  185.             rq = _rq;
  186.             callback += cl;
  187.  
  188.             // last_eror_id = "";
  189.            
  190.         }
  191.         public new void Dispose() {
  192.             rq.restart_user_stream_event_unsubscribe(restart_user_stream);
  193.         }
  194.         public void restart_user_stream() {
  195.             listen_key_no_found = true;
  196.             ct.Cancel();
  197.         }
  198.         private async Task run_async() {
  199.             while (isRunning) {
  200.                 var strmsg = await recieve_async();
  201.                 if (strmsg!="") {
  202.                     var d = JsonSerializer.Deserialize<dynamic>(strmsg);
  203.                         if (d.ContainsKey("e")) {
  204.                             if (d["e"]==GeneralEnums.Order_update) {
  205.                                 var ous = new OrderUpdateStruct(d["o"]["s"],d["o"]["S"],d["o"]["o"],d["o"]["f"],decimal.Parse(d["o"]["q"],CultureInfo.InvariantCulture),
  206.                                 decimal.Parse(d["o"]["p"],CultureInfo.InvariantCulture),decimal.Parse(d["o"]["sp"],CultureInfo.InvariantCulture),
  207.                                 decimal.Parse(d["o"]["ap"],CultureInfo.InvariantCulture),decimal.Parse(d["o"]["l"],CultureInfo.InvariantCulture),
  208.                                 decimal.Parse(d["o"]["L"],CultureInfo.InvariantCulture),d["o"]["X"],d["o"]["c"]);
  209.  
  210.                                 var uss = new UserStreamStruct(d["o"]["s"],GeneralEnums.Order_update,ous,new PositionChangeStruct());
  211.  
  212.                                 callback?.Invoke(uss);
  213.  
  214.                             } else if (d["e"]==GeneralEnums.Position_update) {
  215.                                 var pcs = new PositionChangeStruct(d["a"]["P"][0]["s"],decimal.Parse(d["a"]["P"][0]["ep"],CultureInfo.InvariantCulture),
  216.                                 decimal.Parse(d["a"]["P"][0]["pa"],CultureInfo.InvariantCulture),decimal.Parse(d["a"]["P"][0]["up"],CultureInfo.InvariantCulture));
  217.                                
  218.                                 var uss = new UserStreamStruct(d["a"]["P"][0]["s"],GeneralEnums.Position_update,new OrderUpdateStruct(),pcs);
  219.  
  220.                                 callback?.Invoke(uss);
  221.  
  222.                             } else if (d["e"]=="listenKeyExpired") {
  223.                                 // if (d["E"]!=last_eror_id) {                                    
  224.                                     await restart_user_stream_async();
  225.                                 // }
  226.  
  227.                             } else {
  228.                                 rq.logger.log_system_info($"User data stream listen key {base.endpoint} message: {strmsg}");
  229.                                                            
  230.                             }
  231.  
  232.                         } else {
  233.                             rq.logger.log_system_info($"User data stream listen key {base.endpoint} message: {strmsg}");
  234.                         }
  235.  
  236.                 } else {
  237.                     if (listen_key_no_found) {
  238.                         listen_key_no_found = false;
  239.                         ct = new CancellationTokenSource();
  240.                         await restart_user_stream_async();
  241.                     }
  242.                 }
  243.  
  244.             }
  245.  
  246.         }
  247.  
  248.         private async Task restart_user_stream_async() {
  249.             rq.logger.log_error("listen key expired reconnect");
  250.             await base.disconnect_async();
  251.            
  252.             var lt = this.rq.listen_key_get();
  253.  
  254.             this.rq.listen_key = lt;
  255.             base.endpoint = this.rq.listen_key;
  256.            
  257.             await reconnect_async(3,"listen key expired");
  258.            
  259.         }
  260.  
  261.         public override Task start() {
  262.             return run_async();
  263.         }
  264.  
  265.     }
  266. }
  267.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement