Advertisement
Guest User

Untitled

a guest
Oct 31st, 2014
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.87 KB | None | 0 0
  1. @Test public void testRetryWhen() throws InterruptedException {
  2.         Observable.create(new Observable.OnSubscribe<Integer>() {
  3.  
  4.             @Override public void call(final Subscriber<? super Integer> subscriber) {
  5.                 try {
  6.                     for (int i = 0; i < 10; i++) {
  7.                         Thread.sleep(300);
  8.                         if (SecureRandom.getInstanceStrong().nextBoolean()) {
  9.                             throw new RuntimeException("test exception");
  10.                         } else {
  11.                             subscriber.onNext(i);
  12.                         }
  13.                     }
  14.                     subscriber.onCompleted();
  15.                 } catch (Exception e) {
  16.                     subscriber.onError(e);
  17.                 }
  18.             }
  19.  
  20.         }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
  21.  
  22.             @Override public Observable<?> call(final Observable<? extends Throwable> observable) {
  23.                 return observable.zipWith(Observable.range(1, 10), new Func2<Throwable, Integer, Integer>() {
  24.  
  25.                     @Override public Integer call(final Throwable throwable, final Integer number) {
  26.                         return number;
  27.                     }
  28.  
  29.                 }).flatMap(new Func1<Integer, Observable<Long>>() {
  30.  
  31.                     @Override public Observable<Long> call(final Integer number) {
  32.                         System.out.println("delay retry by " + number + " second(s)");
  33.                         return Observable.timer(number, TimeUnit.SECONDS);
  34.                     }
  35.  
  36.                 });
  37.             }
  38.  
  39.         }).distinct().subscribeOn(Schedulers.io()).subscribe(new IntegerObserver() {
  40.  
  41.             @Override public void onError(final Throwable throwable) {
  42.                 logger.error("onError");
  43.             }
  44.  
  45.         });
  46.         Thread.sleep(30000);
  47.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement