Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Test public void testRetryWhen() throws InterruptedException {
- Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override public void call(final Subscriber<? super Integer> subscriber) {
- try {
- for (int i = 0; i < 10; i++) {
- Thread.sleep(300);
- if (SecureRandom.getInstanceStrong().nextBoolean()) {
- throw new RuntimeException("test exception");
- } else {
- subscriber.onNext(i);
- }
- }
- subscriber.onCompleted();
- } catch (Exception e) {
- subscriber.onError(e);
- }
- }
- }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
- @Override public Observable<?> call(final Observable<? extends Throwable> observable) {
- return observable.zipWith(Observable.range(1, 10), new Func2<Throwable, Integer, Integer>() {
- @Override public Integer call(final Throwable throwable, final Integer number) {
- return number;
- }
- }).flatMap(new Func1<Integer, Observable<Long>>() {
- @Override public Observable<Long> call(final Integer number) {
- System.out.println("delay retry by " + number + " second(s)");
- return Observable.timer(number, TimeUnit.SECONDS);
- }
- });
- }
- }).distinct().subscribeOn(Schedulers.io()).subscribe(new IntegerObserver() {
- @Override public void onError(final Throwable throwable) {
- logger.error("onError");
- }
- });
- Thread.sleep(30000);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement