Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class RxSwitch<T> {
- private final Observable<T> source;
- private final AtomicLong requested = new AtomicLong();
- private final AtomicLong limit = new AtomicLong();
- private final RxSwitch that = this;
- private Subscriber<? super T> child;
- private SwitchSubscription subscription;
- private boolean emitting = false;
- public static <T> RxSwitch<T> on(Observable<T> source) {
- return new RxSwitch<>(source);
- }
- private RxSwitch(Observable<T> source) {
- this.source = source;
- }
- public void allow(long n) {
- if (n < 0) {
- throw new IllegalArgumentException();
- }
- if (n == 0) {
- return;
- }
- if (BackpressureUtils.getAndAddRequest(limit, n) != 0) {
- return;
- }
- if (subscription == null) {
- return;
- }
- synchronized (that) {
- if (emitting) {
- return;
- }
- emitting = true;
- }
- long r = requested.get();
- if (r <= 0) {
- // No item were requested by downstream
- return;
- }
- requestLoop(n, r);
- }
- private void requestLoop(long l, long r) {
- for (;;) {
- if (child.isUnsubscribed()) {
- emitting = false;
- return;
- }
- long e = Math.min(r, l);
- subscription.allow(e);
- r = requested.addAndGet(-e);
- l = limit.addAndGet(-e);
- if (r == 0 || l == 0) {
- emitting = false;
- return;
- }
- }
- }
- public Observable<T> observable() {
- return Observable.create(child -> {
- this.child = child;
- this.subscription = new SwitchSubscription(child);
- child.add(source.subscribe(subscription));
- child.setProducer(n -> {
- if (n < 0) {
- throw new IllegalArgumentException();
- }
- if (n == 0) {
- return;
- }
- if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
- return;
- }
- synchronized (that) {
- if (emitting) {
- return;
- }
- emitting = true;
- }
- long l = limit.get();
- if (l <= 0) {
- emitting = false;
- // No item are supposed to be emitted, blocking the request
- return;
- }
- requestLoop(l, n);
- });
- });
- }
- private class SwitchSubscription extends Subscriber<T> {
- private final Subscriber<? super T> child;
- private final AtomicLong allowed = new AtomicLong();
- private SwitchSubscription(Subscriber<? super T> child) {
- this.child = child;
- }
- @Override
- public void onStart() {
- request(0);
- }
- @Override
- public void onCompleted() {
- child.onCompleted();
- }
- @Override
- public void onError(Throwable e) {
- child.onError(e);
- }
- @Override
- public void onNext(T t) {
- if (allowed.getAndDecrement() <= 0) {
- onError(new MissingBackpressureException());
- }
- child.onNext(t);
- }
- public void allow(long n) {
- allowed.addAndGet(n);
- super.request(n);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement