Advertisement
Guest User

Untitled

a guest
Jun 28th, 2016
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.51 KB | None | 0 0
  1. public class RxSwitch<T> {
  2. private final Observable<T> source;
  3. private final AtomicLong requested = new AtomicLong();
  4. private final AtomicLong limit = new AtomicLong();
  5. private final RxSwitch that = this;
  6. private Subscriber<? super T> child;
  7. private SwitchSubscription subscription;
  8. private boolean emitting = false;
  9.  
  10. public static <T> RxSwitch<T> on(Observable<T> source) {
  11. return new RxSwitch<>(source);
  12. }
  13.  
  14. private RxSwitch(Observable<T> source) {
  15. this.source = source;
  16. }
  17.  
  18. public void allow(long n) {
  19. if (n < 0) {
  20. throw new IllegalArgumentException();
  21. }
  22. if (n == 0) {
  23. return;
  24. }
  25. if (BackpressureUtils.getAndAddRequest(limit, n) != 0) {
  26. return;
  27. }
  28. if (subscription == null) {
  29. return;
  30. }
  31.  
  32. synchronized (that) {
  33. if (emitting) {
  34. return;
  35. }
  36. emitting = true;
  37. }
  38.  
  39. long r = requested.get();
  40. if (r <= 0) {
  41. // No item were requested by downstream
  42. return;
  43. }
  44.  
  45. requestLoop(n, r);
  46. }
  47.  
  48. private void requestLoop(long l, long r) {
  49. for (;;) {
  50. if (child.isUnsubscribed()) {
  51. emitting = false;
  52. return;
  53. }
  54.  
  55. long e = Math.min(r, l);
  56. subscription.allow(e);
  57. r = requested.addAndGet(-e);
  58. l = limit.addAndGet(-e);
  59.  
  60. if (r == 0 || l == 0) {
  61. emitting = false;
  62. return;
  63. }
  64. }
  65. }
  66.  
  67. public Observable<T> observable() {
  68. return Observable.create(child -> {
  69. this.child = child;
  70. this.subscription = new SwitchSubscription(child);
  71. child.add(source.subscribe(subscription));
  72. child.setProducer(n -> {
  73. if (n < 0) {
  74. throw new IllegalArgumentException();
  75. }
  76. if (n == 0) {
  77. return;
  78. }
  79. if (BackpressureUtils.getAndAddRequest(requested, n) != 0) {
  80. return;
  81. }
  82.  
  83. synchronized (that) {
  84. if (emitting) {
  85. return;
  86. }
  87. emitting = true;
  88. }
  89.  
  90. long l = limit.get();
  91. if (l <= 0) {
  92. emitting = false;
  93. // No item are supposed to be emitted, blocking the request
  94. return;
  95. }
  96.  
  97. requestLoop(l, n);
  98. });
  99. });
  100. }
  101.  
  102. private class SwitchSubscription extends Subscriber<T> {
  103. private final Subscriber<? super T> child;
  104. private final AtomicLong allowed = new AtomicLong();
  105.  
  106. private SwitchSubscription(Subscriber<? super T> child) {
  107. this.child = child;
  108. }
  109.  
  110. @Override
  111. public void onStart() {
  112. request(0);
  113. }
  114.  
  115. @Override
  116. public void onCompleted() {
  117. child.onCompleted();
  118. }
  119.  
  120. @Override
  121. public void onError(Throwable e) {
  122. child.onError(e);
  123. }
  124.  
  125. @Override
  126. public void onNext(T t) {
  127. if (allowed.getAndDecrement() <= 0) {
  128. onError(new MissingBackpressureException());
  129. }
  130. child.onNext(t);
  131. }
  132.  
  133. public void allow(long n) {
  134. allowed.addAndGet(n);
  135. super.request(n);
  136. }
  137. }
  138. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement