Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef OBSERVABLE_H
- #define OBSERVABLE_H
- #include <functional>
- template<typename T, typename E>
- class Observer
- {
- public:
- const std::function<void(T)> onNext;
- const std::function<void(E)> onError;
- const std::function<void()> onComplete;
- Observer(
- std::function<void(T)> onNext,
- std::function<void(E)> onError = [] (E) {},
- std::function<void()> onComplete = [] () {}
- ) :
- onNext(onNext),
- onError(onError),
- onComplete(onComplete)
- {
- }
- };
- template<typename T, typename E>
- class Observable;
- template<typename T, typename E>
- class OnSubscribe
- {
- private:
- bool isFinished = false;
- Observer<T,E> mObserver;
- public:
- OnSubscribe(Observer<T,E> observer) : mObserver(observer)
- {
- }
- void onNext(T value)
- {
- if(!isFinished)
- {
- mObserver.onNext(value);
- }
- }
- void onComplete()
- {
- if(!isFinished)
- {
- isFinished = true;
- mObserver.onComplete();
- }
- }
- void onError(E error)
- {
- if(!isFinished)
- {
- isFinished = true;
- mObserver.onError(error);
- }
- }
- template<typename RT, typename RE>
- OnSubscribe<RT, RE> map(std::function<T(RT)> op, std::function<E(RE)> opE) const
- {
- auto onNext = mObserver.onNext;
- auto onError = mObserver.onError;
- auto onComplete = mObserver.onComplete;
- auto newOnNext = [=] (RT t) { onNext(op(t)); };
- auto newOnError = [=] (RE e) { onError(opE(e)); };
- return OnSubscribe<RT, RE>(Observer<RT, RE>(newOnNext,newOnError,onComplete));
- }
- template<typename RT>
- OnSubscribe<RT, E> map(std::function<T(RT)> op) const
- {
- return map(op, [] (E _) { return _;});
- }
- template<typename RT>
- OnSubscribe<RT, E> flatMap(std::function<Observable<T, E>(RT)> op) const
- {
- auto onNext = mObserver.onNext;
- auto onError = mObserver.onError;
- auto onComplete = mObserver.onComplete;
- auto newOnNext = [=] (RT rt) {
- op(rt).subscribe([=] (T t) { onNext(t); }, onError, onComplete);
- };
- return OnSubscribe<RT, E>(Observer<RT, E>(newOnNext,onError,onComplete));
- }
- };
- template<typename T, typename E>
- class Observable
- {
- private:
- std::function<void(OnSubscribe<T,E>)> onSubscribe;
- public:
- Observable(std::function<void(OnSubscribe<T,E>)> onSubscribe) : onSubscribe(onSubscribe)
- {
- }
- template<typename RT, typename RE>
- Observable<RT,RE> lift(std::function<OnSubscribe<T,E>(const OnSubscribe<RT,RE>&)> lifter) const
- {
- auto onSubscribe = this->onSubscribe;
- return Observable<RT,RE>([=] (OnSubscribe<RT,RE> subscriber) { onSubscribe(lifter(subscriber)); });
- }
- template<typename RT, typename RE>
- Observable<RT,RE> map(std::function<RT(T)> op, std::function<RE(E)> opE) const
- {
- return lift<RT,RE>( [=] (const OnSubscribe<RT,RE>& subscriber) { return subscriber.template map<T,E>(op, opE); } );
- }
- template<typename RT>
- Observable<RT, E> map(std::function<RT(T)> op) const
- {
- return map<RT, E>(op, [] (E _) { return _;});
- }
- template<typename RT>
- Observable<RT, E> flatMap(std::function<Observable<RT, E>(T)> op) const
- {
- return lift<RT,E>( [=] (const OnSubscribe<RT,E>& subscriber) { return subscriber.template flatMap<T>(op); });
- }
- template<typename RT, typename RE>
- Observable<RT, RE> compose(std::function<Observable<RT, RE>(const Observable<T, E>&)> composer) const
- {
- return composer(*this);
- }
- void subscribe(const Observer<T, E>& observer) const
- {
- onSubscribe(OnSubscribe<T,E>(observer));
- }
- void subscribe(
- std::function<void(T)> onNext,
- std::function<void(E)> onError = [] (E) {},
- std::function<void()> onComplete = [] () {}
- ) const
- {
- onSubscribe(OnSubscribe<T,E>(Observer<T, E>(onNext, onError, onComplete)));
- }
- };
- #endif // OBSERVABLE_H
Add Comment
Please, Sign In to add comment