Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- *
- */
- package rxjava.issue;
- import java.util.Arrays;
- import java.util.concurrent.atomic.*;
- import org.junit.Assert;
- import org.junit.Test;
- import rx.*;
- import rx.Observable.OnSubscribe;
- import rx.functions.*;
- import rx.observers.TestSubscriber;
- import rx.subscriptions.Subscriptions;
- /**
- */
- public class RoundRobinDispatch {
- public static final class OnSubscribeRoundRobinDispatch<T> implements OnSubscribe<T> {
- final Observable<? extends T> source;
- final AtomicInteger wip;
- final AtomicInteger id;
- final AtomicReferenceArray<Subscriber<? super T>> subscribers;
- final int count;
- final AtomicReference<Subscriber<?>> sourceSubscriber;
- static final NoOpSubscriber<Object> NO_OP_SUBSCRIBER = new NoOpSubscriber<Object>();
- public OnSubscribeRoundRobinDispatch(Observable<? extends T> source, int count) {
- this.source = source;
- this.count = count;
- this.wip = new AtomicInteger();
- this.id = new AtomicInteger();
- this.subscribers = new AtomicReferenceArray<Subscriber<? super T>>(count);
- this.sourceSubscriber = new AtomicReference<Subscriber<?>>();
- }
- @Override
- public void call(Subscriber<? super T> child) {
- final int index = id.getAndIncrement();
- if (index < count) {
- subscribers.set(index, child);
- child.add(Subscriptions.create(new Action0() {
- @Override
- public void call() {
- release(index);
- }
- }));
- wip.incrementAndGet();
- }
- if (index == count - 1) {
- SourceSubscriber ssub = new SourceSubscriber();
- sourceSubscriber.set(ssub);
- source.unsafeSubscribe(ssub);
- } else
- if (index >= count) {
- child.onError(new IllegalStateException("Too many subscribers"));
- }
- }
- void release() {
- if (wip.decrementAndGet() == 0) {
- Subscriber<?> sub = sourceSubscriber.get();
- if (sub == NO_OP_SUBSCRIBER) {
- return;
- }
- if (sourceSubscriber.compareAndSet(sub, NO_OP_SUBSCRIBER)) {
- id.set(0);
- sub.unsubscribe();
- }
- }
- }
- void release(int index) {
- subscribers.set(index, NO_OP_SUBSCRIBER);
- release();
- }
- static final class NoOpSubscriber<T> extends Subscriber<T> {
- @Override
- public void onCompleted() {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onNext(T t) {
- }
- }
- final class SourceSubscriber extends Subscriber<T> {
- int index;
- @Override
- public void onCompleted() {
- for (int i = 0; i < count; i++) {
- subscribers.get(i).onCompleted();
- }
- }
- @Override
- public void onError(Throwable e) {
- for (int i = 0; i < count; i++) {
- subscribers.get(i).onError(e);
- }
- }
- @Override
- public void onNext(T t) {
- int j = count;
- while (j-- > 0) {
- Subscriber<? super T> s = subscribers.get(index);
- if (s != NO_OP_SUBSCRIBER) {
- s.onNext(t);
- }
- index++;
- if (index >= count) {
- index = 0;
- }
- if (s != NO_OP_SUBSCRIBER) {
- return;
- }
- }
- }
- }
- }
- @Test
- public void testSymmetric() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- }
- @Test
- public void testSymmetricAgain() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
- for (int i = 0; i < 3; i++) {
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- }
- }
- @Test
- public void testAsymmetric() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 9), 2));
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- }
- @Test
- public void testEarlyUnsubscribeOne() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.take(2).subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 5, 6, 7, 8, 9, 10));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- }
- @Test
- public void testEarlyUnsubscribeTwo() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.take(2).subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 6, 7, 8, 9, 10));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- }
- @Test
- public void testEarlyUnsubscribeBoth() {
- final AtomicInteger emitted = new AtomicInteger();
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2))
- .doOnNext(new Action1<Integer>() {
- @Override
- public void call(Integer t1) {
- emitted.incrementAndGet();
- }
- });
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- source.take(2).subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.take(2).subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- Assert.assertEquals(4, emitted.get());
- }
- @Test
- public void testRejectOtherSubscriptions() {
- Observable<Integer> source = Observable.create(new OnSubscribeRoundRobinDispatch<Integer>(Observable.range(1, 10), 2));
- TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
- TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
- final TestSubscriber<Integer> ts3 = new TestSubscriber<Integer>();
- source.subscribe(ts1);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList());
- source.doOnNext(new Action1<Integer>() {
- @Override
- public void call(Integer t1) {
- if (t1 == 4) {
- source.subscribe(ts3);
- }
- }
- }).subscribe(ts2);
- ts1.assertReceivedOnNext(Arrays.<Integer>asList(1, 3, 5, 7, 9));
- ts1.assertNoErrors();
- ts1.assertTerminalEvent();
- ts2.assertReceivedOnNext(Arrays.<Integer>asList(2, 4, 6, 8, 10));
- ts2.assertNoErrors();
- ts2.assertTerminalEvent();
- Assert.assertEquals(1, ts3.getOnErrorEvents().size());
- Assert.assertTrue(ts3.getOnErrorEvents().get(0) instanceof IllegalStateException);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement