Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System.Buffers;
- using System.Net.WebSockets;
- using System.Runtime.CompilerServices;
- using System.Runtime.InteropServices;
- using System.Threading.Channels;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- namespace WebSocketClient;
- public class WebSocketClient : IDisposable
- {
- private const int MaxMessageSize = 1_500_000;
- private readonly int _receiveChunkSize;
- private readonly string _url;
- private readonly ILogger<WebSocketClient> _logger;
- private readonly Channel<Message> _receiveChannel;
- private readonly Channel<Message> _sendChannel;
- private readonly uint _numberOfConsumers;
- private readonly SemaphoreSlim _semaphore = new(1, 1);
- private ClientWebSocket? _clientWebSocket;
- private CancellationTokenSource? _cts;
- private Task _sendTask = Task.CompletedTask;
- private Task _dataTask = Task.CompletedTask;
- private bool _isStopped;
- public WebSocketClient(string url, ILoggerFactory? loggerFactory = default, int receiveChunkSize = 4096, bool singleReceiver = false, bool singleSender = false, uint numberOfConsumers = 4)
- {
- if (string.IsNullOrWhiteSpace(url))
- {
- throw new ArgumentNullException(nameof(url));
- }
- _url = url;
- _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketClient>();
- _receiveChunkSize = receiveChunkSize;
- _receiveChannel = Channel.CreateBounded<Message>(new BoundedChannelOptions(10)
- {
- SingleWriter = true,
- SingleReader = singleReceiver,
- FullMode = BoundedChannelFullMode.DropOldest
- });
- _sendChannel = Channel.CreateBounded<Message>(new BoundedChannelOptions(10)
- {
- SingleReader = true,
- SingleWriter = singleSender,
- FullMode = BoundedChannelFullMode.Wait
- });
- _numberOfConsumers = numberOfConsumers;
- }
- internal Task Running { get; private set; } = Task.CompletedTask;
- public bool IsDisposed { get; protected set; }
- public bool IsOpen => _clientWebSocket is { State: WebSocketState.Open };
- public Func<ClientWebSocket> ClientFactory { get; } = () => new ClientWebSocket();
- public event EventHandler? Connected;
- public event EventHandler? Disconnected;
- public event EventHandler<ErrorEventArgs>? Error;
- public event EventHandler<MessageReceivedEventArgs>? MessageReceived;
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- public async Task StartAsync()
- {
- DoDisposeChecks();
- // Prevent a race condition
- await _semaphore.WaitAsync().ConfigureAwait(false);
- try
- {
- if (_cts == null)
- {
- _cts = new CancellationTokenSource();
- _clientWebSocket = null;
- Running = Task.Run(async () =>
- {
- _logger.LogTrace("Connection task started: {Url}", _url);
- try
- {
- await HandleConnection().ConfigureAwait(false);
- }
- catch (Exception e)
- {
- _logger.LogError(e, "Error in connection task: {Url}: ", _url);
- }
- _logger.LogTrace("Connection task ended: {Url}", _url);
- }, _cts.Token);
- var count = 0;
- do
- {
- // wait for _client to be not null
- if (_clientWebSocket != null || _cts.Token.WaitHandle.WaitOne(50))
- {
- break;
- }
- } while (++count < 100);
- }
- }
- finally
- {
- _semaphore.Release();
- }
- }
- public async Task StopAsync()
- {
- DoDisposeChecks();
- await _semaphore.WaitAsync().ConfigureAwait(false);
- try
- {
- if (Running.IsCompleted)
- {
- // We never started
- return;
- }
- _logger.LogDebug("Stopping");
- _isStopped = true;
- try
- {
- // Close the socket first, because ReceiveAsync leaves an invalid socket (state = aborted) when the token is cancelled
- if (_clientWebSocket is { State: not (WebSocketState.Aborted or WebSocketState.Closed or WebSocketState.CloseSent) })
- {
- _logger.LogInformation("CloseOutputAsync called");
- // After this call, the socket state which change to CloseSent
- await _clientWebSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
- }
- }
- catch
- {
- // Any exception thrown here will be caused by the socket already being closed,
- // which is the state we want to put it in by calling this method, which
- // means we don't care if it was already closed and threw an exception
- // when we tried to close it again.
- }
- _logger.LogInformation("cts called");
- _cts?.Cancel();
- await Running.ConfigureAwait(false);
- _cts?.Dispose();
- _cts = null;
- OnDisconnected();
- _logger.LogDebug("Stopped");
- }
- finally
- {
- _semaphore.Release();
- }
- }
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- protected void DoDisposeChecks()
- {
- if (IsDisposed)
- {
- throw new ObjectDisposedException(nameof(WebSocketClient));
- }
- }
- protected virtual void Dispose(bool disposing)
- {
- if (IsDisposed)
- {
- return;
- }
- if (disposing)
- {
- _semaphore.Dispose();
- _receiveChannel.Writer.TryComplete();
- _sendChannel.Writer.TryComplete();
- }
- IsDisposed = true;
- }
- private async Task HandleConnection()
- {
- while (_cts is { IsCancellationRequested: false })
- {
- DoDisposeChecks();
- _logger.LogTrace("Connecting...");
- using var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
- try
- {
- await _semaphore.WaitAsync(connectionCts.Token).ConfigureAwait(false);
- try
- {
- var clientWebSocket = ClientFactory();
- await clientWebSocket.ConnectAsync(new Uri(_url), connectionCts.Token).ConfigureAwait(false);
- _clientWebSocket = clientWebSocket;
- }
- finally
- {
- _semaphore.Release();
- }
- OnConnected();
- _isStopped = false;
- using (_clientWebSocket)
- {
- var token = connectionCts.Token;
- _sendTask = ProcessSendAsync(_clientWebSocket, token);
- if (_numberOfConsumers == 0)
- {
- _dataTask = ProcessDataAsync(token);
- }
- else
- {
- var processingTasks = Enumerable.Range(1, (int)_numberOfConsumers).Select(_ => ProcessDataAsync(token));
- _dataTask = Task.WhenAll(processingTasks);
- }
- // Listen for messages
- byte[]? buffer = null;
- var count = 0;
- while (true)
- {
- // Rent a buffer only we run out of space, not for every read
- buffer ??= ArrayPool<byte>.Shared.Rent(_receiveChunkSize);
- // We need Memory<byte> overload instead of the ArraySegment one (this one allocates a Task per read).
- var result = await _clientWebSocket.ReceiveAsync(buffer.AsMemory()[count..], connectionCts.Token).ConfigureAwait(false);
- if (result.MessageType == WebSocketMessageType.Close)
- {
- _logger.LogDebug("The remote server has closed the connection");
- // Prevent leak
- if (MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)buffer.AsMemory(0, count), out var arraySegment))
- {
- ArrayPool<byte>.Shared.Return(arraySegment.Array!);
- }
- // Some exchanges e.g. Binance are closing the remote connection when an error occurs, so we simply reconnect when that happens
- if (!_isStopped)
- {
- break;
- }
- // Jumps to finally block
- return;
- }
- count += result.Count;
- if (count > MaxMessageSize)
- {
- throw new InvalidOperationException("Maximum size of the message was exceeded.");
- }
- if (result.EndOfMessage)
- {
- // Avoid working with strings to reduce allocation
- await _receiveChannel.Writer.WriteAsync(new Message(buffer.AsMemory(0, count)), connectionCts.Token).ConfigureAwait(false);
- count = 0;
- buffer = null;
- }
- else if (count >= buffer.Length)
- {
- // Create the new array
- var newArray = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
- // Copy the old array to the new array
- buffer.AsSpan().CopyTo(newArray);
- // Return the old array
- ArrayPool<byte>.Shared.Return(buffer);
- buffer = newArray;
- }
- }
- }
- }
- catch (OperationCanceledException)
- {
- // operation was canceled, ignore
- }
- catch (WebSocketException ex)
- {
- if (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
- {
- _logger.LogDebug("Prematurely closed");
- }
- OnError(new ErrorEventArgs(ex.Message, ex));
- connectionCts.Token.WaitHandle.WaitOne(2000);
- }
- catch (Exception ex)
- {
- OnError(new ErrorEventArgs(ex.Message, ex));
- }
- finally
- {
- connectionCts.Cancel();
- await Task.WhenAll(_sendTask, _dataTask).ConfigureAwait(false);
- }
- }
- }
- protected virtual void OnConnected()
- {
- _logger.LogTrace("OnConnected: Connection opened (URL: {Url})", _url);
- Connected?.Invoke(this, EventArgs.Empty);
- }
- protected virtual void OnDisconnected()
- {
- _logger.LogTrace("OnDisconnected: Connection closed");
- Disconnected?.Invoke(this, EventArgs.Empty);
- }
- protected virtual void OnError(ErrorEventArgs e)
- {
- _logger.LogError(e.Exception, "OnError: {Message}", e.Message);
- Error?.Invoke(this, e);
- }
- protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
- {
- MessageReceived?.Invoke(this, e);
- }
- #region Send
- // Producer
- public ValueTask SendAsync(Message message)
- {
- DoDisposeChecks();
- return _sendChannel.Writer.WriteAsync(message);
- }
- // Producer
- public bool Send(Message message)
- {
- DoDisposeChecks();
- return _sendChannel.Writer.TryWrite(message);
- }
- // Consumer
- private async Task ProcessSendAsync(WebSocket webSocket, CancellationToken cancellationToken)
- {
- try
- {
- while (await _sendChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
- {
- while (_sendChannel.Reader.TryRead(out var message))
- {
- try
- {
- await webSocket.SendAsync(message.Buffer, WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "SendAsync: {ExceptionMessage}", ex.Message);
- }
- }
- }
- }
- catch (OperationCanceledException)
- {
- // operation was canceled, ignore
- }
- finally
- {
- _logger.LogTrace("Send loop finished");
- }
- }
- #endregion
- #region Data
- private async Task ProcessDataAsync(CancellationToken cancellationToken)
- {
- try
- {
- while (await _receiveChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
- {
- while (_receiveChannel.Reader.TryRead(out var message))
- {
- try
- {
- await ProcessMessageAsync(message).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- _logger.LogError("Data loop: {Message}", ex.Message);
- }
- message.Dispose();
- }
- }
- }
- catch (OperationCanceledException)
- {
- // operation was canceled, ignore
- }
- finally
- {
- _logger.LogTrace("Data loop finished");
- }
- }
- private Task ProcessMessageAsync(Message message)
- {
- OnMessageReceived(new MessageReceivedEventArgs(message));
- return Task.CompletedTask;
- }
- #endregion
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement