Advertisement
Guest User

Untitled

a guest
Apr 18th, 2015
187
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.94 KB | None | 0 0
  1. /**
  2. *
  3. */
  4. package rxjava.issue;
  5.  
  6. import java.util.Arrays;
  7. import java.util.concurrent.atomic.*;
  8.  
  9. import org.junit.Assert;
  10. import org.junit.Test;
  11.  
  12. import rx.*;
  13. import rx.Observable.OnSubscribe;
  14. import rx.functions.*;
  15. import rx.observers.TestSubscriber;
  16. import rx.subscriptions.Subscriptions;
  17.  
  18. /**
  19. */
  20. public class RoundRobinDispatch {
  21. public static final class OnSubscribeRoundRobinDispatch<T> implements OnSubscribe<T> {
  22. final Observable<? extends T> source;
  23. final AtomicInteger wip;
  24. final AtomicInteger id;
  25. final AtomicReferenceArray<Subscriber<? super T>> subscribers;
  26. final int count;
  27. final AtomicReference<Subscriber<?>> sourceSubscriber;
  28. static final NoOpSubscriber<Object> NO_OP_SUBSCRIBER = new NoOpSubscriber<Object>();
  29. public OnSubscribeRoundRobinDispatch(Observable<? extends T> source, int count) {
  30. this.source = source;
  31. this.count = count;
  32. this.wip = new AtomicInteger();
  33. this.id = new AtomicInteger();
  34. this.subscribers = new AtomicReferenceArray<Subscriber<? super T>>(count);
  35. this.sourceSubscriber = new AtomicReference<Subscriber<?>>();
  36. }
  37. @Override
  38. public void call(Subscriber<? super T> child) {
  39. final int index = id.getAndIncrement();
  40. if (index < count) {
  41. subscribers.set(index, child);
  42. child.add(Subscriptions.create(new Action0() {
  43. @Override
  44. public void call() {
  45. release(index);
  46. }
  47. }));
  48. wip.incrementAndGet();
  49. }
  50.  
  51. if (index == count - 1) {
  52. SourceSubscriber ssub = new SourceSubscriber();
  53. sourceSubscriber.set(ssub);
  54. source.unsafeSubscribe(ssub);
  55. } else
  56. if (index >= count) {
  57. child.onError(new IllegalStateException("Too many subscribers"));
  58. }
  59. }
  60. void release() {
  61. if (wip.decrementAndGet() == 0) {
  62. Subscriber<?> sub = sourceSubscriber.get();
  63. if (sub == NO_OP_SUBSCRIBER) {
  64. return;
  65. }
  66. if (sourceSubscriber.compareAndSet(sub, NO_OP_SUBSCRIBER)) {
  67. id.set(0);
  68. sub.unsubscribe();
  69. }
  70. }
  71. }
  72. void release(int index) {
  73. subscribers.set(index, NO_OP_SUBSCRIBER);
  74. release();
  75. }
  76.  
  77. static final class NoOpSubscriber<T> extends Subscriber<T> {
  78. @Override
  79. public void onCompleted() {
  80. }
  81.  
  82. @Override
  83. public void onError(Throwable e) {
  84. }
  85.  
  86. @Override
  87. public void onNext(T t) {
  88. }
  89.  
  90. }
  91. final class SourceSubscriber extends Subscriber<T> {
  92. int index;
  93. @Override
  94. public void onCompleted() {
  95. for (int i = 0; i < count; i++) {
  96. subscribers.get(i).onCompleted();
  97. }
  98. }
  99.  
  100. @Override
  101. public void onError(Throwable e) {
  102. for (int i = 0; i < count; i++) {
  103. subscribers.get(i).onError(e);
  104. }
  105. }
  106.  
  107. @Override
  108. public void onNext(T t) {
  109. int j = count;
  110. while (j-- > 0) {
  111. Subscriber<? super T> s = subscribers.get(index);
  112. if (s != NO_OP_SUBSCRIBER) {
  113. s.onNext(t);
  114. }
  115. index++;
  116. if (index >= count) {
  117. index = 0;
  118. }
  119. if (s != NO_OP_SUBSCRIBER) {
  120. return;
  121. }
  122. }
  123. }
  124.  
  125. }
  126. }
  127.  
  128. @Test
  129. public void testSymmetric() {
  130. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
  131.  
  132. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  133. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  134.  
  135. source.subscribe(ts1);
  136. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  137. source.subscribe(ts2);
  138.  
  139. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
  140. ts1.assertNoErrors();
  141. ts1.assertTerminalEvent();
  142.  
  143. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
  144. ts2.assertNoErrors();
  145. ts2.assertTerminalEvent();
  146. }
  147. @Test
  148. public void testSymmetricAgain() {
  149. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
  150.  
  151. for (int i = 0; i < 3; i++) {
  152. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  153. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  154.  
  155. source.subscribe(ts1);
  156. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  157. source.subscribe(ts2);
  158.  
  159. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
  160. ts1.assertNoErrors();
  161. ts1.assertTerminalEvent();
  162.  
  163. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
  164. ts2.assertNoErrors();
  165. ts2.assertTerminalEvent();
  166. }
  167. }
  168.  
  169. @Test
  170. public void testAsymmetric() {
  171. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 9), 2));
  172.  
  173. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  174. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  175.  
  176. source.subscribe(ts1);
  177. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  178. source.subscribe(ts2);
  179.  
  180. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
  181. ts1.assertNoErrors();
  182. ts1.assertTerminalEvent();
  183.  
  184. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8));
  185. ts2.assertNoErrors();
  186. ts2.assertTerminalEvent();
  187. }
  188.  
  189. @Test
  190. public void testEarlyUnsubscribeOne() {
  191. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
  192.  
  193. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  194. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  195.  
  196. source.take(2).subscribe(ts1);
  197. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  198. source.subscribe(ts2);
  199.  
  200. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3));
  201. ts1.assertNoErrors();
  202. ts1.assertTerminalEvent();
  203.  
  204. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 5, 6, 7, 8, 9, 10));
  205. ts2.assertNoErrors();
  206. ts2.assertTerminalEvent();
  207. }
  208. @Test
  209. public void testEarlyUnsubscribeTwo() {
  210. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
  211.  
  212. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  213. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  214.  
  215. source.subscribe(ts1);
  216. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  217. source.take(2).subscribe(ts2);
  218.  
  219. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 6, 7, 8, 9, 10));
  220. ts1.assertNoErrors();
  221. ts1.assertTerminalEvent();
  222.  
  223. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4));
  224. ts2.assertNoErrors();
  225. ts2.assertTerminalEvent();
  226. }
  227. @Test
  228. public void testEarlyUnsubscribeBoth() {
  229. final AtomicInteger emitted = new AtomicInteger();
  230. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2))
  231. .doOnNext(new Action1<Integer>() {
  232. @Override
  233. public void call(Integer t1) {
  234. emitted.incrementAndGet();
  235. }
  236. });
  237.  
  238. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  239. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  240.  
  241. source.take(2).subscribe(ts1);
  242. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  243. source.take(2).subscribe(ts2);
  244.  
  245. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3));
  246. ts1.assertNoErrors();
  247. ts1.assertTerminalEvent();
  248.  
  249. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4));
  250. ts2.assertNoErrors();
  251. ts2.assertTerminalEvent();
  252.  
  253. Assert.assertEquals(4, emitted.get());
  254. }
  255. @Test
  256. public void testRejectOtherSubscriptions() {
  257. Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
  258.  
  259. TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
  260. TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
  261. final TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();
  262.  
  263. source.subscribe(ts1);
  264. ts1.assertReceivedOnNext(Arrays.<Integer>asList());
  265. source.doOnNext(new Action1<Integer>() {
  266. @Override
  267. public void call(Integer t1) {
  268. if (t1 == 4) {
  269. source.subscribe(ts3);
  270. }
  271. }
  272. }).subscribe(ts2);
  273.  
  274. ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
  275. ts1.assertNoErrors();
  276. ts1.assertTerminalEvent();
  277.  
  278. ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
  279. ts2.assertNoErrors();
  280. ts2.assertTerminalEvent();
  281.  
  282. Assert.assertEquals(1, ts3.getOnErrorEvents().size());
  283. Assert.assertTrue(ts3.getOnErrorEvents().get(0) instanceof IllegalStateException);
  284. }
  285. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement