Guest User

Untitled

a guest
Jun 20th, 2020
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 1.05 KB | None | 0 0
  1. fun <T> Observable<T>.withLatestObserver(): Observable<T> {
  2.     val emitters = AtomicReference<Set<ObservableEmitter<T>>>(emptySet())
  3.  
  4.     return observable { emitter ->
  5.         emitters.updateAndGet { it + emitter }
  6.  
  7.         val disposables = CompositeDisposable()
  8.         emitter.setDisposable(disposables)
  9.         disposables += Disposable {
  10.             emitters.update { it - emitter }
  11.         }
  12.  
  13.         subscribe(
  14.             object : ObservableObserver<T>, CompletableCallbacks by emitter {
  15.                 override fun onSubscribe(disposable: Disposable) {
  16.                     disposables += disposable
  17.                 }
  18.  
  19.                 override fun onNext(value: T) {
  20.                     if (emitters.value.lastOrNull() == emitter) {
  21.                         emitter.onNext(value)
  22.                     }
  23.                 }
  24.             }
  25.         )
  26.     }
  27. }
  28.  
  29. // Usage
  30. val shared =
  31.     aStream
  32.         .share() // Allow only one subscription to aStream at a time
  33.         .withLatestObserver() // Only latest observer will receive values
Advertisement
Add Comment
Please, Sign In to add comment