Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class BlockingStreams
- {
- public Stream WriteableStream { get; }
- public Stream ReadableStream { get; }
- private readonly CancellationToken _ct;
- private readonly BlockingCollection<MemoryStream> _blocks;
- public BlockingStreams(int? maxWrites = null, CancellationToken ct = default)
- {
- _ct = ct;
- _blocks = maxWrites.HasValue
- ? new BlockingCollection<MemoryStream>(maxWrites.Value)
- : new BlockingCollection<MemoryStream>();
- var readDisposed = new TaskCompletionSource<object>();
- var writeDisposed = new TaskCompletionSource<object>();
- ReadableStream = new ReadStream(_blocks, ct, readDisposed);
- WriteableStream = new WriteStream(_blocks, ct, writeDisposed);
- Task.WhenAll(readDisposed.Task, writeDisposed.Task)
- .ContinueWith(t => _blocks.Dispose());
- }
- class ReadStream : Stream
- {
- readonly TaskCompletionSource<object> _disposed;
- readonly BlockingCollection<MemoryStream> _blocks;
- readonly CancellationToken _ct;
- MemoryStream _current;
- long _position;
- public override bool CanRead => true;
- public override bool CanSeek => false;
- public override bool CanWrite => false;
- public override long Length => throw new NotSupportedException();
- public ReadStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
- {
- _blocks = blocks;
- _ct = ct;
- _disposed = disposed;
- }
- public override long Position
- {
- get => _position;
- set => throw new NotSupportedException();
- }
- public override void Flush() { }
- public override int Read(byte[] buffer, int offset, int count)
- {
- var read = 0;
- while (read < count)
- {
- if (_current == null)
- {
- // only wait if no bytes read yet
- var timeout = read == 0 ? -1 : 0;
- if (!_blocks.TryTake(out _current, timeout, _ct))
- return read;
- }
- var thisRead = _current.Read(buffer, offset + read, count - read);
- read += thisRead;
- _position += thisRead;
- // is current block exhausted?
- if (_current.Position == _current.Length)
- {
- using (_current)
- _current = null;
- }
- }
- return read;
- }
- public override long Seek(long offset, SeekOrigin origin) =>
- throw new NotSupportedException();
- public override void SetLength(long value) =>
- throw new NotSupportedException();
- public override void Write(byte[] buffer, int offset, int count) =>
- throw new NotSupportedException();
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- _disposed.SetResult(default);
- }
- }
- class WriteStream : Stream
- {
- readonly TaskCompletionSource<object> _disposed;
- readonly BlockingCollection<MemoryStream> _blocks;
- readonly CancellationToken _ct;
- readonly RecyclableMemoryStreamManager _streamManager = new RecyclableMemoryStreamManager();
- long _position = 0;
- public WriteStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
- {
- _blocks = blocks;
- _ct = ct;
- _disposed = disposed;
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- _blocks.CompleteAdding();
- _disposed.SetResult(default);
- }
- public override bool CanRead => false;
- public override bool CanSeek => false;
- public override bool CanWrite => true;
- public override long Length => throw new NotImplementedException();
- public override void Write(byte[] buffer, int offset, int count)
- {
- var stream = _streamManager.GetStream(tag: default, buffer, offset, count);
- _position += stream.Length;
- _blocks.Add(stream, _ct);
- }
- public override long Position
- {
- get => _position;
- set => throw new NotSupportedException();
- }
- public override void Flush() { }
- public override int Read(byte[] buffer, int offset, int count) =>
- throw new NotSupportedException();
- public override long Seek(long offset, SeekOrigin origin) =>
- throw new NotSupportedException();
- public override void SetLength(long value) =>
- throw new NotSupportedException();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement