codeido

retryWhen with delay

Nov 6th, 2018
596
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.51 KB | None | 0 0
  1. package com.elsight.common;
  2.  
  3. import org.reactivestreams.Publisher;
  4.  
  5. import java.util.concurrent.TimeUnit;
  6.  
  7. import io.reactivex.Flowable;
  8. import io.reactivex.functions.Function;
  9.  
  10. public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Publisher<?>> {
  11. private static final int INFINITE = -1;
  12.  
  13. private final long maxRetries;
  14. private final long retryDelayMillis;
  15. private int retryCount;
  16.  
  17. public RetryWithDelay(final long maxRetries, final long retryDelayMillis) {
  18. this.maxRetries = maxRetries;
  19. this.retryDelayMillis = retryDelayMillis;
  20. this.retryCount = 0;
  21. }
  22.  
  23. public RetryWithDelay(final long retryDelayMillis) {
  24. this(INFINITE, retryDelayMillis);
  25. }
  26.  
  27. @Override
  28. public Publisher<?> apply(Flowable<? extends Throwable> flowable) throws Exception {
  29. return flowable.flatMap(new Function<Throwable, Publisher<?>>() {
  30. @Override
  31. public Publisher<?> apply(Throwable throwable) throws Exception {
  32. if (maxRetries == INFINITE || ++retryCount < maxRetries) {
  33. // When this Observable calls onNext, the original
  34. // Observable will be retried (i.e. re-subscribed).
  35. return Flowable.timer(retryDelayMillis,
  36. TimeUnit.MILLISECONDS);
  37. }
  38.  
  39. // Max retries hit. Just pass the error along.
  40. return Flowable.error(throwable);
  41. }
  42. });
  43. }
  44. }
Add Comment
Please, Sign In to add comment