Guest User

Untitled

a guest
Dec 16th, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.99 KB | None | 0 0
  1. /// Convert async observable to async sequence, non-blocking.
  2. /// Producer will be awaited until item is consumed by the async
  3. /// enumerator.
  4. let toAsyncSeq (source: IAsyncObservable<'a>) : AsyncSeq<'a> =
  5. let ping = new AutoResetEvent false
  6. let pong = new AutoResetEvent false
  7. let mutable latest : Notification<'a> = OnCompleted
  8.  
  9. let _obv n =
  10. async {
  11. latest <- n
  12. ping.Set () |> ignore
  13. do! Async.AwaitWaitHandle pong |> Async.Ignore
  14. }
  15.  
  16. asyncSeq {
  17. let! dispose = AsyncObserver _obv |> source.SubscribeAsync
  18. let mutable running = true
  19.  
  20. while running do
  21. do! Async.AwaitWaitHandle ping |> Async.Ignore
  22. match latest with
  23. | OnNext x ->
  24. yield x
  25. | OnError ex ->
  26. running <- false
  27. raise ex
  28. | OnCompleted ->
  29. running <- false
  30. pong.Set () |> ignore
  31.  
  32. do! dispose.DisposeAsync ()
  33. }
Add Comment
Please, Sign In to add comment