Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- *
- */
- public class ConfigurableDelay<T> implements Observable.Operator<T, T> {
- private final Func1<T, TimeConfiguration> itemToTime;
- private final Scheduler scheduler;
- public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime) {
- this(itemToTime, Schedulers.computation());
- }
- public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime, final Scheduler scheduler) {
- this.itemToTime = itemToTime;
- this.scheduler = scheduler;
- }
- @Override
- public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
- return new Subscriber<T>(subscriber) {
- private TimeConfiguration nextTime = null;
- @Override
- public void onCompleted() {
- subscriber.onCompleted();
- }
- @Override
- public void onError(final Throwable e) {
- subscriber.onError(e);
- }
- @Override
- public void onNext(final T item) {
- TimeConfiguration previousNextTime = nextTime;
- this.nextTime = itemToTime.call(item);
- if (previousNextTime == null) {
- subscriber.onNext(item);
- } else {
- scheduler.createWorker().schedule(new Action0() {
- @Override
- public void call() {
- subscriber.onNext(item);
- }
- }, previousNextTime.getTime(), previousNextTime.getUnit());
- }
- }
- };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement