Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- fun <T> Observable<T>.withLatestObserver(): Observable<T> {
- val emitters = AtomicReference<Set<ObservableEmitter<T>>>(emptySet())
- return observable { emitter ->
- emitters.updateAndGet { it + emitter }
- val disposables = CompositeDisposable()
- emitter.setDisposable(disposables)
- disposables += Disposable {
- emitters.update { it - emitter }
- }
- subscribe(
- object : ObservableObserver<T>, CompletableCallbacks by emitter {
- override fun onSubscribe(disposable: Disposable) {
- disposables += disposable
- }
- override fun onNext(value: T) {
- if (emitters.value.lastOrNull() == emitter) {
- emitter.onNext(value)
- }
- }
- }
- )
- }
- }
- // Usage
- val shared =
- aStream
- .share() // Allow only one subscription to aStream at a time
- .withLatestObserver() // Only latest observer will receive values
Advertisement
Add Comment
Please, Sign In to add comment