Advertisement
NLinker

PublishSubject overload

Apr 14th, 2016
229
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.09 KB | None | 0 0
  1.     static class Oi {
  2.         private final Operation op;
  3.         private final Long i;
  4.         private final int[] inr = new int[1024];
  5.  
  6.         public Oi(Operation op, Long i) {
  7.             this.op = op;
  8.             this.i = i;
  9.         }
  10.  
  11.         @Override
  12.         public String toString() {
  13.             //noinspection StringBufferReplaceableByString
  14.             final StringBuilder sb = new StringBuilder("Oi(");
  15.             sb.append("i=").append(i);
  16.             sb.append(",op=").append(op);
  17.             sb.append(')');
  18.             return sb.toString();
  19.         }
  20.     }
  21.  
  22.     @Test
  23.     public void testSubjects() throws Exception {
  24.         int n = 400_000;
  25.  
  26.         PublishSubject<Object> delSubj = PublishSubject.create();
  27.         PublishSubject<Object> updSubj = PublishSubject.create();
  28.         PublishSubject<Object> mrgSubj = PublishSubject.create();
  29.         Operation[] ops = Operation.values();
  30.  
  31.         ThreadPoolExecutor tpeSub = new ThreadPoolExecutor(
  32.             1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
  33.             r -> {
  34.                 final Thread t = new Thread(r, "singleton thread pool");
  35.                 t.setDaemon(true);
  36.                 return t;
  37.             }
  38.         );
  39.         ThreadPoolExecutor tpeObs = new ThreadPoolExecutor(
  40.             1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
  41.             r -> {
  42.                 final Thread t = new Thread(r, "singleton thread pool");
  43.                 t.setDaemon(true);
  44.                 return t;
  45.             }
  46.         );
  47.         Scheduler schedulerSub = Schedulers.from(tpeSub);
  48.         Scheduler schedulerObs = Schedulers.from(tpeObs);
  49.  
  50.         int[] row = new int[n];
  51.         Long[] src = new Long[n];
  52.         for (int i = 0; i < src.length; i++) {
  53.             src[i] = (long) (i + 1);
  54.         }
  55.  
  56.         AtomicInteger ai = new AtomicInteger(0);
  57.         AtomicInteger bi = new AtomicInteger(0);
  58.         CountDownLatch cdl = new CountDownLatch(1);
  59.  
  60.         Observable.interval(1, TimeUnit.MICROSECONDS)
  61.             .map(i -> new Oi(ops[(int) (i % 3)], i))
  62.             .flatMap(msg ->
  63.                 dispatch0(msg, delSubj, updSubj, mrgSubj, schedulerSub))
  64.             //.observeOn(schedulerObs)
  65.             .subscribe();
  66.  
  67.         //Observable.<Long>create(s -> {
  68.         //        for (int i = 0; i < n; i++) {
  69.         //            s.onNext((long) i);
  70.         //        }
  71.         //    })
  72.         //    .map(i -> new Oi(ops[(int) (i % 3)], i))
  73.         //    .flatMap(msg ->
  74.         //        dispatch0(msg, delSubj, updSubj, mrgSubj, schedulerSub))
  75.         //    //.observeOn(schedulerObs)
  76.         //    .subscribe();
  77.  
  78.         final Random random = new Random(123);
  79.  
  80.         delSubj.map(msg -> (Oi) msg)
  81.             .subscribe(oi -> {
  82.                 if (oi.i >= n) cdl.countDown();
  83.                 else {
  84.                     row[oi.i.intValue()] = bi.incrementAndGet();
  85.                     randomDelay(random);
  86.                     logProgress(delSubj, "delSubj", oi.i);
  87.                 }
  88.             });
  89.         updSubj.map(msg -> (Oi) msg)
  90.             .subscribe(oi -> {
  91.                 if (oi.i >= n) cdl.countDown();
  92.                 else {
  93.                     row[oi.i.intValue()] = bi.incrementAndGet();
  94.                     randomDelay(random);
  95.                     logProgress(updSubj, "updSubj", oi.i);
  96.                 }
  97.             });
  98.         mrgSubj.map(msg -> (Oi) msg)
  99.             .subscribe(oi -> {
  100.                 if (oi.i >= n) cdl.countDown();
  101.                 else {
  102.                     row[oi.i.intValue()] = bi.incrementAndGet();
  103.                     randomDelay(random);
  104.                     logProgress(mrgSubj, "mrgSubj", oi.i);
  105.                 }
  106.             });
  107.  
  108.         cdl.await();
  109.         // verify
  110.         System.out.println("row = " + row.length);
  111.         for (int i = 0; i < row.length; i++) {
  112.             Assert.assertEquals(row[i], i + 1);
  113.         }
  114.         tpeSub.shutdown();
  115.     }
  116.  
  117.     private void logProgress(PublishSubject<Object> subj, String subjName, Long i) {
  118.         //rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker 438296 14025472 1993321008
  119.         //rx.schedulers.ExecutorScheduler$ExecutorAction          438295 10519080 1875223656
  120.         //rx.internal.operators.OperatorSubscribeOn$1$1           438295 10519080 1864806376
  121.         //rx.Observable                                           438784 7020544 1826583488
  122.         //com.vertigo.service.test.su.QueueTest$$Lambda$6.7056937 438296 14025472 1819698968
  123.         //int[]                                                   458490 1806895632 1806895632
  124.         //com.vertigo.service.test.su.QueueTest$Oi                438296 10519104 1805782968
  125.         if (i % 1000 < 3) {
  126.             System.out.println(i + ", " + subjName);
  127.         }
  128.     }
  129.  
  130.     int[] delays = new int[]{0, 2, 3};
  131.  
  132.     void randomDelay(Random random) {
  133.         try {
  134.             Thread.sleep(delays[random.nextInt(delays.length)]);
  135.         } catch (InterruptedException e) {
  136.             e.printStackTrace();
  137.         }
  138.     }
  139.  
  140.     private <T> Observable<Void> dispatch0(Oi msg,
  141.                                            PublishSubject<Object> delSubj,
  142.                                            PublishSubject<Object> updSubj,
  143.                                            PublishSubject<Object> mrgSubj,
  144.                                            Scheduler scheduler) {
  145.         return Observable
  146.             .<Void>create(voidSubscriber -> {
  147.                 try {
  148.                     switch (msg.op) {
  149.                         case Delete:
  150.                             delSubj.onNext(msg);
  151.                             break;
  152.                         case Merge:
  153.                             mrgSubj.onNext(msg);
  154.                             break;
  155.                         case Update:
  156.                             updSubj.onNext(msg);
  157.                             break;
  158.                     }
  159.                 } catch (Throwable e) {
  160.                     voidSubscriber.onError(e);
  161.                 }
  162.                 voidSubscriber.onCompleted();
  163.             })
  164.             .subscribeOn(scheduler);
  165.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement