Advertisement
Guest User

Untitled

a guest
Oct 21st, 2016
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.27 KB | None | 0 0
  1. public class Command<E> {
  2.  
  3. public CommandActionObservable execute() {
  4. final CommandAction<E> command = createCommand();
  5.  
  6. final OnSubscribe<CommandAction<E>> onSubscribe = (subscriber) -> {
  7.  
  8. /* Create a listener that handles notifications and register it.
  9. * The idea here is to push the command downstream so it can be re-executed
  10. */
  11. final Listener listener = (event) -> {
  12. subscriber.onNext(command);
  13. }
  14. registerListener(listener);
  15.  
  16. /* This is where I'm having trouble. The unregister method
  17. * should be executed when the subscriber unsubscribed,
  18. * but it never happens
  19. */
  20. subscriber.add(Subscriptions.create(() -> {
  21. unregisterListener(listener);
  22. }));
  23.  
  24. // pass the initial command downstream
  25. subscriber.onNext(command);
  26.  
  27. kickOffBackgroundAction();
  28. }
  29.  
  30. final Observable<CommandAction<E>> actionObservable = Observable.create(onSubscribe)
  31. .onBackpressureLatest()
  32. .observeOn(Shedulers.io())
  33. .onBackpressureLatest();
  34. return new CommandActionObservable((subscriber) -> {
  35. actionObservable.unsafeSubscribe(subscriber);
  36. })
  37. }
  38.  
  39. public class CommandActionObservable extends Observable<CommandAction<E> {
  40.  
  41. // default constructor omitted
  42.  
  43. public Observable<E> toResult() {
  44. return lift((Operator) (subscriber) -> {
  45. return new Subscriber<CommandAction<E>>() {
  46. // delegate onCompleted and onError to subscriber
  47.  
  48. public void onNext(CommandAction<E> action) {
  49. // execute the action and pass the result downstream
  50. final E result = action.execute();
  51. subscriber.onNext(result)
  52. }
  53. }
  54. }
  55. }
  56.  
  57. }
  58.  
  59. }
  60.  
  61. final Observable<SomeType> obs = new Command<SomeType>()
  62. .execute()
  63. .toResult();
  64. subscription.add(obs.subscribe(// impl here));
  65.  
  66.  
  67.  
  68. public void onDestroy() {
  69. super.onDestroy();
  70. subscription.unsubscribe();
  71. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement