Advertisement
Guest User

Untitled

a guest
Jun 19th, 2015
244
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.10 KB | None | 0 0
  1. using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:xyz.dat")))
  2. using (var writer = new StreamWriter(stream))
  3. {
  4. // perform processor-bound work to get the next line to be written
  5. // then write the line
  6. writer.Write("abc");
  7. }
  8.  
  9. using System;
  10. using System.Collections.Concurrent;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16.  
  17. namespace xyz
  18. {
  19. /// <summary> Wrap a stream into an asynchronous buffer. All writes will be queued to be executed asynchronously</summary>
  20. class AsyncBufferedStream : Stream
  21. {
  22. /// <summary> wrapped inner stream; we pass every write through to this stream</summary>
  23. private Stream wrappedStream;
  24.  
  25. /// <summary> buffer, contains all items not yet written to the wrapped stream</summary>
  26. private BlockingCollection<BufferBlock> blockBuffer = new BlockingCollection<BufferBlock>();
  27.  
  28. /// <summary> The Task that writes to the wrapped Stream</summary>
  29. private Task writeTask;
  30.  
  31. private long countBytesInBuffer = 0;
  32. private long countBlocksWritten = 0;
  33.  
  34.  
  35. /// <summary> Wrap a stream into an asynchronous buffer. All writes will be queued to be executed asynchronously</summary>
  36. public AsyncBufferedStream(Stream stream)
  37. {
  38. this.wrappedStream = stream;
  39. this.writeTask = new Task(this.writeToInnerStream, TaskCreationOptions.LongRunning);
  40. this.writeTask.Start();
  41. }
  42.  
  43. /// <summary> blocks, until the current content of the buffer has been written to the underlying stream, then flushes the underlying stream</summary>
  44. public override void Flush()
  45. {
  46. var buf = this.blockBuffer;
  47. if (buf==null || buf.IsAddingCompleted) throw new ObjectDisposedException("BlockBuffer");
  48.  
  49. if (this.blockBuffer.Count > 0) // buffer is not empty
  50. {
  51. // insert a dummy item, and wait until it is processed.
  52. using (ManualResetEventSlim ev = new ManualResetEventSlim(false))
  53. {
  54. this.blockBuffer.Add(new BufferBlock(ev));
  55. ev.Wait();
  56. }
  57. // now all bytes, that where in the buffer before Flush(), have been written to the wrapped stream
  58. }
  59.  
  60. var stream = this.wrappedStream;
  61. if (stream == null) throw new ObjectDisposedException("Stream");
  62. stream.Flush();
  63.  
  64. }
  65.  
  66. /// <summary> not supported </summary>
  67. public override long Seek(long offset, SeekOrigin origin) {throw new NotSupportedException();}
  68.  
  69. /// <summary> not supported </summary>
  70. public override void SetLength(long value) {throw new NotSupportedException();}
  71.  
  72. /// <summary> not supported </summary>
  73. public override int Read(byte[] buffer, int offset, int count) {throw new NotSupportedException();}
  74.  
  75. /// <summary> queues the specified bytes for writing to the wrapped stream </summary>
  76. public override void Write(byte[] buffer, int offset, int count)
  77. {
  78. var bytes = new byte[count];
  79. Array.Copy(buffer, offset, bytes, 0, count);
  80.  
  81. Interlocked.Add(ref this.countBytesInBuffer, bytes.Length);
  82. this.blockBuffer.Add(new BufferBlock(bytes));
  83. }
  84.  
  85. /// <summary> continuosly writes the content of the buffer to the wrapped stream</summary>
  86. private void writeToInnerStream()
  87. {
  88. foreach (BufferBlock block in blockBuffer.GetConsumingEnumerable())
  89. {
  90. if (block.bytes != null)
  91. {
  92. this.wrappedStream.Write(block.bytes, 0, block.bytes.Length);
  93. Interlocked.Add(ref this.countBytesInBuffer, -block.bytes.Length);
  94. Interlocked.Increment(ref this.countBlocksWritten);
  95. }
  96.  
  97. // notify observers such as the Flush() method
  98. if (block.ConsumedEvent != null)
  99. {
  100. block.ConsumedEvent.Set();
  101. }
  102. }
  103. }
  104.  
  105. /// <summary> not supported </summary>
  106. public override bool CanRead { get { return false; } }
  107.  
  108. /// <summary> not supported </summary>
  109. public override bool CanSeek { get { return false; } }
  110.  
  111. /// <summary> returns true, when this stream can be written to</summary>
  112. public override bool CanWrite
  113. {
  114. get
  115. {
  116. var buf = this.blockBuffer;
  117. if (buf == null) return false;
  118. var stream = this.wrappedStream;
  119. if (stream == null) return false;
  120. return !buf.IsAddingCompleted && stream.CanWrite;
  121. }
  122. }
  123.  
  124. /// <summary> returns the length of the underlying wrapped stream</summary>
  125. public override long Length
  126. {
  127. get
  128. {
  129. var stream = this.wrappedStream;
  130. if (stream == null) throw new ObjectDisposedException("stream");
  131. return stream.Length;
  132. }
  133. }
  134.  
  135. /// <summary> current position</summary>
  136. public override long Position
  137. {
  138. get
  139. {
  140. var stream = this.wrappedStream;
  141. if (stream==null) throw new ObjectDisposedException("stream");
  142. return stream.Position + this.countBytesInBuffer;
  143. }
  144. set{throw new NotSupportedException();} }
  145.  
  146.  
  147. /// <summary> Disposes all owned resources</summary>
  148. protected override void Dispose(bool disposing)
  149. {
  150. try
  151. {
  152. if (disposing)
  153. {
  154. var buf = this.blockBuffer;
  155. var task = this.writeTask;
  156.  
  157. if (buf != null)
  158. {
  159. try
  160. {
  161. buf.CompleteAdding(); // now the writeTask should exit
  162. if (task != null) task.Wait();
  163. }
  164. finally
  165. {
  166. buf.Dispose();
  167. }
  168. }
  169.  
  170. if (task != null && task.IsCompleted) // Task can only be disposed if it has completed
  171. {
  172. task.Dispose();
  173. }
  174.  
  175. var stream = this.wrappedStream;
  176. if (stream != null)
  177. {
  178. try
  179. {
  180. stream.Flush();
  181. }
  182. finally
  183. {
  184. stream.Dispose();
  185. }
  186. }
  187. }
  188. }
  189. finally
  190. {
  191. this.wrappedStream = null;
  192. this.blockBuffer = null;
  193. this.writeTask = null;
  194. this.countBytesInBuffer = 0;
  195.  
  196. base.Dispose(disposing);
  197. }
  198. }
  199.  
  200. /// <summary> A block of bytes to be written</summary>
  201. private struct BufferBlock
  202. {
  203. /// <summary> bytes to be written</summary>
  204. internal readonly byte[] bytes;
  205.  
  206. /// <summary> notification when this block has been written (optional)</summary>
  207. internal readonly ManualResetEventSlim ConsumedEvent;
  208.  
  209. /// <summary> Normal constructor: just write this block to the stream</summary>
  210. internal BufferBlock(byte[] bytes)
  211. {
  212. this.bytes = bytes;
  213. this.ConsumedEvent = null;
  214. }
  215. /// <summary> Notification for .Flush(): just set an event when this block has been processed</summary>
  216. internal BufferBlock(ManualResetEventSlim consumedEvent)
  217. {
  218. this.bytes = null;
  219. this.ConsumedEvent = consumedEvent;
  220. }
  221. }
  222.  
  223. }
  224.  
  225.  
  226.  
  227. // -----------------------------------------------------------------------------
  228. // test code
  229. static void Main()
  230. {
  231. Stopwatch watch = Stopwatch.StartNew();
  232. using (var stream = System.IO.File.Create(@"c:temptest.dat"))
  233. using (var writer = new StreamWriter(stream))
  234. {
  235. for (int i = 0; i < 100000; i++)
  236. {
  237. writer.WriteLine("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
  238. for (int j = 0; j < 1000; j++)
  239. {
  240. Math.Sin(Math.Cos(Math.Pow(j, j)));
  241. }
  242. }
  243. }
  244. Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
  245.  
  246. watch = Stopwatch.StartNew();
  247. using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:temptest.dat")))
  248. using (var writer = new StreamWriter(stream))
  249. {
  250. for (int i = 0; i < 100000; i++)
  251. {
  252. writer.WriteLine("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
  253. for (int j = 0; j < 1000; j++)
  254. {
  255. Math.Sin(Math.Cos(Math.Pow(j, j)));
  256. }
  257. }
  258. }
  259. Console.WriteLine("buffered: " + watch.ElapsedMilliseconds);
  260. return;
  261. }
  262.  
  263. private static void Main(string[] args)
  264. {
  265. var random = new Random();
  266.  
  267.  
  268. Stopwatch watch = Stopwatch.StartNew();
  269.  
  270. using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:temptest.dat")))
  271. using (var writer = new StreamWriter(stream))
  272. {
  273. for (int i = 0; i < 1000; i++)
  274. {
  275. writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
  276. }
  277. }
  278. Console.WriteLine("buffered: " + watch.ElapsedMilliseconds);
  279.  
  280. watch = Stopwatch.StartNew();
  281. using (var stream = System.IO.File.Create(@"c:temptest.dat"))
  282. using (var writer = new StreamWriter(stream))
  283. {
  284. for (int i = 0; i < 10000; i++)
  285. {
  286. writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
  287. }
  288. }
  289. Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
  290. Console.ReadLine();
  291. return;
  292. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement