Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:xyz.dat")))
- using (var writer = new StreamWriter(stream))
- {
- // perform processor-bound work to get the next line to be written
- // then write the line
- writer.Write("abc");
- }
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace xyz
- {
- /// <summary> Wrap a stream into an asynchronous buffer. All writes will be queued to be executed asynchronously</summary>
- class AsyncBufferedStream : Stream
- {
- /// <summary> wrapped inner stream; we pass every write through to this stream</summary>
- private Stream wrappedStream;
- /// <summary> buffer, contains all items not yet written to the wrapped stream</summary>
- private BlockingCollection<BufferBlock> blockBuffer = new BlockingCollection<BufferBlock>();
- /// <summary> The Task that writes to the wrapped Stream</summary>
- private Task writeTask;
- private long countBytesInBuffer = 0;
- private long countBlocksWritten = 0;
- /// <summary> Wrap a stream into an asynchronous buffer. All writes will be queued to be executed asynchronously</summary>
- public AsyncBufferedStream(Stream stream)
- {
- this.wrappedStream = stream;
- this.writeTask = new Task(this.writeToInnerStream, TaskCreationOptions.LongRunning);
- this.writeTask.Start();
- }
- /// <summary> blocks, until the current content of the buffer has been written to the underlying stream, then flushes the underlying stream</summary>
- public override void Flush()
- {
- var buf = this.blockBuffer;
- if (buf==null || buf.IsAddingCompleted) throw new ObjectDisposedException("BlockBuffer");
- if (this.blockBuffer.Count > 0) // buffer is not empty
- {
- // insert a dummy item, and wait until it is processed.
- using (ManualResetEventSlim ev = new ManualResetEventSlim(false))
- {
- this.blockBuffer.Add(new BufferBlock(ev));
- ev.Wait();
- }
- // now all bytes, that where in the buffer before Flush(), have been written to the wrapped stream
- }
- var stream = this.wrappedStream;
- if (stream == null) throw new ObjectDisposedException("Stream");
- stream.Flush();
- }
- /// <summary> not supported </summary>
- public override long Seek(long offset, SeekOrigin origin) {throw new NotSupportedException();}
- /// <summary> not supported </summary>
- public override void SetLength(long value) {throw new NotSupportedException();}
- /// <summary> not supported </summary>
- public override int Read(byte[] buffer, int offset, int count) {throw new NotSupportedException();}
- /// <summary> queues the specified bytes for writing to the wrapped stream </summary>
- public override void Write(byte[] buffer, int offset, int count)
- {
- var bytes = new byte[count];
- Array.Copy(buffer, offset, bytes, 0, count);
- Interlocked.Add(ref this.countBytesInBuffer, bytes.Length);
- this.blockBuffer.Add(new BufferBlock(bytes));
- }
- /// <summary> continuosly writes the content of the buffer to the wrapped stream</summary>
- private void writeToInnerStream()
- {
- foreach (BufferBlock block in blockBuffer.GetConsumingEnumerable())
- {
- if (block.bytes != null)
- {
- this.wrappedStream.Write(block.bytes, 0, block.bytes.Length);
- Interlocked.Add(ref this.countBytesInBuffer, -block.bytes.Length);
- Interlocked.Increment(ref this.countBlocksWritten);
- }
- // notify observers such as the Flush() method
- if (block.ConsumedEvent != null)
- {
- block.ConsumedEvent.Set();
- }
- }
- }
- /// <summary> not supported </summary>
- public override bool CanRead { get { return false; } }
- /// <summary> not supported </summary>
- public override bool CanSeek { get { return false; } }
- /// <summary> returns true, when this stream can be written to</summary>
- public override bool CanWrite
- {
- get
- {
- var buf = this.blockBuffer;
- if (buf == null) return false;
- var stream = this.wrappedStream;
- if (stream == null) return false;
- return !buf.IsAddingCompleted && stream.CanWrite;
- }
- }
- /// <summary> returns the length of the underlying wrapped stream</summary>
- public override long Length
- {
- get
- {
- var stream = this.wrappedStream;
- if (stream == null) throw new ObjectDisposedException("stream");
- return stream.Length;
- }
- }
- /// <summary> current position</summary>
- public override long Position
- {
- get
- {
- var stream = this.wrappedStream;
- if (stream==null) throw new ObjectDisposedException("stream");
- return stream.Position + this.countBytesInBuffer;
- }
- set{throw new NotSupportedException();} }
- /// <summary> Disposes all owned resources</summary>
- protected override void Dispose(bool disposing)
- {
- try
- {
- if (disposing)
- {
- var buf = this.blockBuffer;
- var task = this.writeTask;
- if (buf != null)
- {
- try
- {
- buf.CompleteAdding(); // now the writeTask should exit
- if (task != null) task.Wait();
- }
- finally
- {
- buf.Dispose();
- }
- }
- if (task != null && task.IsCompleted) // Task can only be disposed if it has completed
- {
- task.Dispose();
- }
- var stream = this.wrappedStream;
- if (stream != null)
- {
- try
- {
- stream.Flush();
- }
- finally
- {
- stream.Dispose();
- }
- }
- }
- }
- finally
- {
- this.wrappedStream = null;
- this.blockBuffer = null;
- this.writeTask = null;
- this.countBytesInBuffer = 0;
- base.Dispose(disposing);
- }
- }
- /// <summary> A block of bytes to be written</summary>
- private struct BufferBlock
- {
- /// <summary> bytes to be written</summary>
- internal readonly byte[] bytes;
- /// <summary> notification when this block has been written (optional)</summary>
- internal readonly ManualResetEventSlim ConsumedEvent;
- /// <summary> Normal constructor: just write this block to the stream</summary>
- internal BufferBlock(byte[] bytes)
- {
- this.bytes = bytes;
- this.ConsumedEvent = null;
- }
- /// <summary> Notification for .Flush(): just set an event when this block has been processed</summary>
- internal BufferBlock(ManualResetEventSlim consumedEvent)
- {
- this.bytes = null;
- this.ConsumedEvent = consumedEvent;
- }
- }
- }
- // -----------------------------------------------------------------------------
- // test code
- static void Main()
- {
- Stopwatch watch = Stopwatch.StartNew();
- using (var stream = System.IO.File.Create(@"c:temptest.dat"))
- using (var writer = new StreamWriter(stream))
- {
- for (int i = 0; i < 100000; i++)
- {
- writer.WriteLine("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
- for (int j = 0; j < 1000; j++)
- {
- Math.Sin(Math.Cos(Math.Pow(j, j)));
- }
- }
- }
- Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
- watch = Stopwatch.StartNew();
- using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:temptest.dat")))
- using (var writer = new StreamWriter(stream))
- {
- for (int i = 0; i < 100000; i++)
- {
- writer.WriteLine("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
- for (int j = 0; j < 1000; j++)
- {
- Math.Sin(Math.Cos(Math.Pow(j, j)));
- }
- }
- }
- Console.WriteLine("buffered: " + watch.ElapsedMilliseconds);
- return;
- }
- private static void Main(string[] args)
- {
- var random = new Random();
- Stopwatch watch = Stopwatch.StartNew();
- using (var stream = new AsyncBufferedStream(System.IO.File.Create(@"c:temptest.dat")))
- using (var writer = new StreamWriter(stream))
- {
- for (int i = 0; i < 1000; i++)
- {
- writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
- }
- }
- Console.WriteLine("buffered: " + watch.ElapsedMilliseconds);
- watch = Stopwatch.StartNew();
- using (var stream = System.IO.File.Create(@"c:temptest.dat"))
- using (var writer = new StreamWriter(stream))
- {
- for (int i = 0; i < 10000; i++)
- {
- writer.WriteLine(Enumerable.Range(0, 1000).Aggregate("", (s, x) => s + (char)random.Next(33, 126)));
- }
- }
- Console.WriteLine("unbuffered: " + watch.ElapsedMilliseconds);
- Console.ReadLine();
- return;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement