Guest User

Rx: Async Stream Reader

a guest
Nov 14th, 2010
374
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 1.28 KB | None | 0 0
  1. // Copied from this post:
  2. // http://blogs.msdn.com/b/jeffva/archive/2010/07/23/rx-on-the-server-part-1-of-n-asynchronous-system-io-stream-reading.aspx
  3.  
  4. public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
  5. {
  6.     return Observable.Iterate<byte[]>(result =>
  7.         AsyncReadHelper(result, stream, bufferSize));
  8. }
  9.  
  10. private static IEnumerable<IObservable<object>> AsyncReadHelper(
  11.     IObserver<byte[]> result, Stream stream, int bufferSize)
  12. {
  13.     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
  14.         stream.BeginRead, stream.EndRead);
  15.     var buffer = new byte[bufferSize];
  16.  
  17.    while (true)
  18.     {
  19.         var read = asyncRead(buffer, 0, bufferSize).Start();
  20.         yield return read;
  21.         // As asyncRead returns an AsyncSubject,
  22.         // and the Iterator operator takes care of exceptions,
  23.         // we know at this point there will always be exactly one value in the list.
  24.  
  25.  
  26.         var bytesRead = read[0];
  27.         if (bytesRead == 0)
  28.         {
  29.             // End of file      
  30.             yield break;
  31.         }
  32.         var outBuffer = new byte[bytesRead];
  33.         Array.Copy(buffer, outBuffer, bytesRead);
  34.  
  35.         // Fire out to the outer Observable
  36.         result.OnNext(outBuffer);
  37.     }
  38. }
Add Comment
Please, Sign In to add comment