Advertisement
Guest User

Untitled

a guest
May 24th, 2015
222
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.58 KB | None | 0 0
  1. /**
  2. *
  3. */
  4. public class ConfigurableDelay<T> implements Observable.Operator<T, T> {
  5.  
  6. private final Func1<T, TimeConfiguration> itemToTime;
  7. private final Scheduler scheduler;
  8.  
  9. public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime) {
  10. this(itemToTime, Schedulers.computation());
  11. }
  12.  
  13. public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime, final Scheduler scheduler) {
  14. this.itemToTime = itemToTime;
  15. this.scheduler = scheduler;
  16. }
  17.  
  18. @Override
  19. public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
  20.  
  21. return new Subscriber<T>(subscriber) {
  22.  
  23. private TimeConfiguration nextTime = null;
  24.  
  25. @Override
  26. public void onCompleted() {
  27. subscriber.onCompleted();
  28. }
  29.  
  30. @Override
  31. public void onError(final Throwable e) {
  32. subscriber.onError(e);
  33. }
  34.  
  35. @Override
  36. public void onNext(final T item) {
  37. TimeConfiguration previousNextTime = nextTime;
  38. this.nextTime = itemToTime.call(item);
  39. if (previousNextTime == null) {
  40. subscriber.onNext(item);
  41. } else {
  42. scheduler.createWorker().schedule(new Action0() {
  43. @Override
  44. public void call() {
  45. subscriber.onNext(item);
  46. }
  47. }, previousNextTime.getTime(), previousNextTime.getUnit());
  48. }
  49. }
  50. };
  51. }
  52. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement