Guest User

Untitled

a guest
Oct 18th, 2018
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.97 KB | None | 0 0
  1. #ifndef OBSERVABLE_H
  2. #define OBSERVABLE_H
  3.  
  4. #include <functional>
  5.  
  6. template<typename T, typename E>
  7. class Observer
  8. {
  9. public:
  10. const std::function<void(T)> onNext;
  11. const std::function<void(E)> onError;
  12. const std::function<void()> onComplete;
  13.  
  14. Observer(
  15. std::function<void(T)> onNext,
  16. std::function<void(E)> onError = [] (E) {},
  17. std::function<void()> onComplete = [] () {}
  18. ) :
  19. onNext(onNext),
  20. onError(onError),
  21. onComplete(onComplete)
  22. {
  23. }
  24. };
  25.  
  26. template<typename T, typename E>
  27. class Observable;
  28.  
  29. template<typename T, typename E>
  30. class OnSubscribe
  31. {
  32. private:
  33. bool isFinished = false;
  34. Observer<T,E> mObserver;
  35. public:
  36. OnSubscribe(Observer<T,E> observer) : mObserver(observer)
  37. {
  38. }
  39.  
  40. void onNext(T value)
  41. {
  42. if(!isFinished)
  43. {
  44. mObserver.onNext(value);
  45. }
  46. }
  47.  
  48. void onComplete()
  49. {
  50. if(!isFinished)
  51. {
  52. isFinished = true;
  53. mObserver.onComplete();
  54. }
  55. }
  56.  
  57. void onError(E error)
  58. {
  59. if(!isFinished)
  60. {
  61. isFinished = true;
  62. mObserver.onError(error);
  63. }
  64. }
  65.  
  66. template<typename RT, typename RE>
  67. OnSubscribe<RT, RE> map(std::function<T(RT)> op, std::function<E(RE)> opE) const
  68. {
  69. auto onNext = mObserver.onNext;
  70. auto onError = mObserver.onError;
  71. auto onComplete = mObserver.onComplete;
  72.  
  73. auto newOnNext = [=] (RT t) { onNext(op(t)); };
  74. auto newOnError = [=] (RE e) { onError(opE(e)); };
  75.  
  76. return OnSubscribe<RT, RE>(Observer<RT, RE>(newOnNext,newOnError,onComplete));
  77. }
  78.  
  79. template<typename RT>
  80. OnSubscribe<RT, E> map(std::function<T(RT)> op) const
  81. {
  82. return map(op, [] (E _) { return _;});
  83. }
  84.  
  85. template<typename RT>
  86. OnSubscribe<RT, E> flatMap(std::function<Observable<T, E>(RT)> op) const
  87. {
  88. auto onNext = mObserver.onNext;
  89. auto onError = mObserver.onError;
  90. auto onComplete = mObserver.onComplete;
  91.  
  92. auto newOnNext = [=] (RT rt) {
  93. op(rt).subscribe([=] (T t) { onNext(t); }, onError, onComplete);
  94. };
  95.  
  96. return OnSubscribe<RT, E>(Observer<RT, E>(newOnNext,onError,onComplete));
  97. }
  98. };
  99.  
  100. template<typename T, typename E>
  101. class Observable
  102. {
  103. private:
  104. std::function<void(OnSubscribe<T,E>)> onSubscribe;
  105. public:
  106. Observable(std::function<void(OnSubscribe<T,E>)> onSubscribe) : onSubscribe(onSubscribe)
  107. {
  108. }
  109.  
  110. template<typename RT, typename RE>
  111. Observable<RT,RE> lift(std::function<OnSubscribe<T,E>(const OnSubscribe<RT,RE>&)> lifter) const
  112. {
  113. auto onSubscribe = this->onSubscribe;
  114. return Observable<RT,RE>([=] (OnSubscribe<RT,RE> subscriber) { onSubscribe(lifter(subscriber)); });
  115. }
  116.  
  117. template<typename RT, typename RE>
  118. Observable<RT,RE> map(std::function<RT(T)> op, std::function<RE(E)> opE) const
  119. {
  120. return lift<RT,RE>( [=] (const OnSubscribe<RT,RE>& subscriber) { return subscriber.template map<T,E>(op, opE); } );
  121. }
  122.  
  123. template<typename RT>
  124. Observable<RT, E> map(std::function<RT(T)> op) const
  125. {
  126. return map<RT, E>(op, [] (E _) { return _;});
  127. }
  128.  
  129. template<typename RT>
  130. Observable<RT, E> flatMap(std::function<Observable<RT, E>(T)> op) const
  131. {
  132. return lift<RT,E>( [=] (const OnSubscribe<RT,E>& subscriber) { return subscriber.template flatMap<T>(op); });
  133. }
  134.  
  135. template<typename RT, typename RE>
  136. Observable<RT, RE> compose(std::function<Observable<RT, RE>(const Observable<T, E>&)> composer) const
  137. {
  138. return composer(*this);
  139. }
  140.  
  141. void subscribe(const Observer<T, E>& observer) const
  142. {
  143. onSubscribe(OnSubscribe<T,E>(observer));
  144. }
  145.  
  146. void subscribe(
  147. std::function<void(T)> onNext,
  148. std::function<void(E)> onError = [] (E) {},
  149. std::function<void()> onComplete = [] () {}
  150. ) const
  151. {
  152. onSubscribe(OnSubscribe<T,E>(Observer<T, E>(onNext, onError, onComplete)));
  153. }
  154. };
  155.  
  156. #endif // OBSERVABLE_H
Add Comment
Please, Sign In to add comment