Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module Observable =
- open System
- let private emptyDisposable =
- { new IDisposable with member __.Dispose() = () }
- let empty =
- { new IObservable<_> with
- member this.Subscribe observer =
- observer.OnCompleted()
- emptyDisposable }
- let singleton value =
- { new IObservable<_> with
- member this.Subscribe obs =
- obs.OnNext value
- obs.OnCompleted()
- emptyDisposable }
- let append (first:IObservable<_>) (second:IObservable<_>) =
- { new IObservable<_> with
- member obs.Subscribe observer =
- let gate = new obj()
- let subscription = (ref None : IDisposable option ref)
- let temporarySubscription =
- first.Subscribe { new IObserver<_> with
- member x.OnNext(value) = observer.OnNext(value)
- member x.OnError(ex) = observer.OnError(ex)
- member x.OnCompleted() =
- lock gate (fun () ->
- subscription := Some (second.Subscribe(observer)) ) }
- lock gate (fun () -> if Option.isNone !subscription then subscription := Some (temporarySubscription))
- let disposable() = match !subscription with None -> () | Some disp -> subscription := None; disp.Dispose();
- { new IDisposable with
- member disp.Dispose() = disposable() } }
- let cons value source =
- let flip f x y = f y x
- value |> singleton |> flip append source
Add Comment
Please, Sign In to add comment