Guest User

Untitled

a guest
Apr 23rd, 2018
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.44 KB | None | 0 0
  1. module Observable =
  2. open System
  3.  
  4. let private emptyDisposable =
  5. { new IDisposable with member __.Dispose() = () }
  6.  
  7. let empty =
  8. { new IObservable<_> with
  9. member this.Subscribe observer =
  10. observer.OnCompleted()
  11. emptyDisposable }
  12.  
  13. let singleton value =
  14. { new IObservable<_> with
  15. member this.Subscribe obs =
  16. obs.OnNext value
  17. obs.OnCompleted()
  18. emptyDisposable }
  19.  
  20. let append (first:IObservable<_>) (second:IObservable<_>) =
  21. { new IObservable<_> with
  22. member obs.Subscribe observer =
  23. let gate = new obj()
  24. let subscription = (ref None : IDisposable option ref)
  25. let temporarySubscription =
  26. first.Subscribe { new IObserver<_> with
  27. member x.OnNext(value) = observer.OnNext(value)
  28. member x.OnError(ex) = observer.OnError(ex)
  29. member x.OnCompleted() =
  30. lock gate (fun () ->
  31. subscription := Some (second.Subscribe(observer)) ) }
  32.  
  33. lock gate (fun () -> if Option.isNone !subscription then subscription := Some (temporarySubscription))
  34. let disposable() = match !subscription with None -> () | Some disp -> subscription := None; disp.Dispose();
  35. { new IDisposable with
  36. member disp.Dispose() = disposable() } }
  37.  
  38. let cons value source =
  39. let flip f x y = f y x
  40. value |> singleton |> flip append source
Add Comment
Please, Sign In to add comment