Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Copied from this post:
- // http://blogs.msdn.com/b/jeffva/archive/2010/07/23/rx-on-the-server-part-1-of-n-asynchronous-system-io-stream-reading.aspx
- public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
- {
- return Observable.Iterate<byte[]>(result =>
- AsyncReadHelper(result, stream, bufferSize));
- }
- private static IEnumerable<IObservable<object>> AsyncReadHelper(
- IObserver<byte[]> result, Stream stream, int bufferSize)
- {
- var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
- stream.BeginRead, stream.EndRead);
- var buffer = new byte[bufferSize];
- while (true)
- {
- var read = asyncRead(buffer, 0, bufferSize).Start();
- yield return read;
- // As asyncRead returns an AsyncSubject,
- // and the Iterator operator takes care of exceptions,
- // we know at this point there will always be exactly one value in the list.
- var bytesRead = read[0];
- if (bytesRead == 0)
- {
- // End of file
- yield break;
- }
- var outBuffer = new byte[bytesRead];
- Array.Copy(buffer, outBuffer, bytesRead);
- // Fire out to the outer Observable
- result.OnNext(outBuffer);
- }
- }
Add Comment
Please, Sign In to add comment