Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class Command<E> {
- public CommandActionObservable execute() {
- final CommandAction<E> command = createCommand();
- final OnSubscribe<CommandAction<E>> onSubscribe = (subscriber) -> {
- /* Create a listener that handles notifications and register it.
- * The idea here is to push the command downstream so it can be re-executed
- */
- final Listener listener = (event) -> {
- subscriber.onNext(command);
- }
- registerListener(listener);
- /* This is where I'm having trouble. The unregister method
- * should be executed when the subscriber unsubscribed,
- * but it never happens
- */
- subscriber.add(Subscriptions.create(() -> {
- unregisterListener(listener);
- }));
- // pass the initial command downstream
- subscriber.onNext(command);
- kickOffBackgroundAction();
- }
- final Observable<CommandAction<E>> actionObservable = Observable.create(onSubscribe)
- .onBackpressureLatest()
- .observeOn(Shedulers.io())
- .onBackpressureLatest();
- return new CommandActionObservable((subscriber) -> {
- actionObservable.unsafeSubscribe(subscriber);
- })
- }
- public class CommandActionObservable extends Observable<CommandAction<E> {
- // default constructor omitted
- public Observable<E> toResult() {
- return lift((Operator) (subscriber) -> {
- return new Subscriber<CommandAction<E>>() {
- // delegate onCompleted and onError to subscriber
- public void onNext(CommandAction<E> action) {
- // execute the action and pass the result downstream
- final E result = action.execute();
- subscriber.onNext(result)
- }
- }
- }
- }
- }
- }
- final Observable<SomeType> obs = new Command<SomeType>()
- .execute()
- .toResult();
- subscription.add(obs.subscribe(// impl here));
- public void onDestroy() {
- super.onDestroy();
- subscription.unsubscribe();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement