Advertisement
Guest User

Untitled

a guest
Apr 21st, 2019
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.92 KB | None | 0 0
  1. class BlockingStreams
  2. {
  3. public Stream WriteableStream { get; }
  4. public Stream ReadableStream { get; }
  5.  
  6. private readonly CancellationToken _ct;
  7. private readonly BlockingCollection<MemoryStream> _blocks;
  8.  
  9. public BlockingStreams(int? maxWrites = null, CancellationToken ct = default)
  10. {
  11. _ct = ct;
  12. _blocks = maxWrites.HasValue
  13. ? new BlockingCollection<MemoryStream>(maxWrites.Value)
  14. : new BlockingCollection<MemoryStream>();
  15.  
  16. var readDisposed = new TaskCompletionSource<object>();
  17. var writeDisposed = new TaskCompletionSource<object>();
  18.  
  19. ReadableStream = new ReadStream(_blocks, ct, readDisposed);
  20. WriteableStream = new WriteStream(_blocks, ct, writeDisposed);
  21.  
  22. Task.WhenAll(readDisposed.Task, writeDisposed.Task)
  23. .ContinueWith(t => _blocks.Dispose());
  24. }
  25.  
  26. class ReadStream : Stream
  27. {
  28. readonly TaskCompletionSource<object> _disposed;
  29. readonly BlockingCollection<MemoryStream> _blocks;
  30. readonly CancellationToken _ct;
  31. MemoryStream _current;
  32. long _position;
  33.  
  34. public override bool CanRead => true;
  35.  
  36. public override bool CanSeek => false;
  37.  
  38. public override bool CanWrite => false;
  39.  
  40. public override long Length => throw new NotSupportedException();
  41.  
  42. public ReadStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
  43. {
  44. _blocks = blocks;
  45. _ct = ct;
  46. _disposed = disposed;
  47. }
  48.  
  49. public override long Position
  50. {
  51. get => _position;
  52. set => throw new NotSupportedException();
  53. }
  54.  
  55. public override void Flush() { }
  56.  
  57. public override int Read(byte[] buffer, int offset, int count)
  58. {
  59. var read = 0;
  60.  
  61. while (read < count)
  62. {
  63. if (_current == null)
  64. {
  65. // only wait if no bytes read yet
  66. var timeout = read == 0 ? -1 : 0;
  67. if (!_blocks.TryTake(out _current, timeout, _ct))
  68. return read;
  69. }
  70.  
  71. var thisRead = _current.Read(buffer, offset + read, count - read);
  72. read += thisRead;
  73. _position += thisRead;
  74.  
  75. // is current block exhausted?
  76. if (_current.Position == _current.Length)
  77. {
  78. using (_current)
  79. _current = null;
  80. }
  81. }
  82.  
  83. return read;
  84. }
  85.  
  86. public override long Seek(long offset, SeekOrigin origin) =>
  87. throw new NotSupportedException();
  88.  
  89. public override void SetLength(long value) =>
  90. throw new NotSupportedException();
  91.  
  92. public override void Write(byte[] buffer, int offset, int count) =>
  93. throw new NotSupportedException();
  94.  
  95. protected override void Dispose(bool disposing)
  96. {
  97. base.Dispose(disposing);
  98. _disposed.SetResult(default);
  99. }
  100. }
  101.  
  102. class WriteStream : Stream
  103. {
  104. readonly TaskCompletionSource<object> _disposed;
  105. readonly BlockingCollection<MemoryStream> _blocks;
  106. readonly CancellationToken _ct;
  107. readonly RecyclableMemoryStreamManager _streamManager = new RecyclableMemoryStreamManager();
  108. long _position = 0;
  109.  
  110. public WriteStream(BlockingCollection<MemoryStream> blocks, CancellationToken ct, TaskCompletionSource<object> disposed)
  111. {
  112. _blocks = blocks;
  113. _ct = ct;
  114. _disposed = disposed;
  115. }
  116.  
  117. protected override void Dispose(bool disposing)
  118. {
  119. base.Dispose(disposing);
  120. _blocks.CompleteAdding();
  121. _disposed.SetResult(default);
  122. }
  123.  
  124. public override bool CanRead => false;
  125.  
  126. public override bool CanSeek => false;
  127.  
  128. public override bool CanWrite => true;
  129.  
  130. public override long Length => throw new NotImplementedException();
  131.  
  132. public override void Write(byte[] buffer, int offset, int count)
  133. {
  134. var stream = _streamManager.GetStream(tag: default, buffer, offset, count);
  135. _position += stream.Length;
  136. _blocks.Add(stream, _ct);
  137. }
  138.  
  139. public override long Position
  140. {
  141. get => _position;
  142. set => throw new NotSupportedException();
  143. }
  144.  
  145. public override void Flush() { }
  146.  
  147. public override int Read(byte[] buffer, int offset, int count) =>
  148. throw new NotSupportedException();
  149.  
  150. public override long Seek(long offset, SeekOrigin origin) =>
  151. throw new NotSupportedException();
  152.  
  153. public override void SetLength(long value) =>
  154. throw new NotSupportedException();
  155. }
  156. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement