Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- /*
- interface Observer<T> {
- next(t: T): void;
- error(e: Error): void
- complete(): void;
- }
- */
- class Observer {
- constructor(next, error, complete) {
- this.next = next;
- this.error = error;
- this.complete = complete;
- }
- }
- const incrementer = scanObservable(createIntervalObservable(100), function(s, item) {
- return s + 1;
- }, 0);
- const squaredIncrementer = mapObservable(incrementer, x => x * x);
- const firstTenSquares = take(squaredIncrementer, 10);
- // {{ square$ | async }}
- firstTenSquares({
- next(n) {
- console.log("heard", n);
- },
- complete() {
- console.log("finished");
- },
- })
- // type CancelSubscription = () => void;
- // type FunctionObservable<T> = (observer: Observer<T>) => CancelSubscription
- function createIntervalObservable(nMilliseconds) {
- return function(observer) {
- let intervalToken = setInterval(() => observer.next(), nMilliseconds);
- // cancellation
- return () => {
- if(intervalToken) {
- clearInterval(intervalToken)
- intervalToken = null;
- }
- }
- }
- }
- // mapObservable(o: FunctionObservable<T>, mapper: (t: T) => U): FunctionObservable<U>
- function mapObservable(observable, mapper) {
- return function(observer) {
- return observable({
- next(t) {
- const u = mapper(t);
- observer.next(u);
- },
- error: (e) => observer.error(e),
- complete: () => observer.complete(),
- })
- }
- }
- // scanObservable<S,T>(o: FunctionObservable<T>, reducer: (s: S, t: T) => S): FunctionObservable<S>
- function scanObservable(observable, reducer, state) {
- return function(observer) {
- return observable({
- next(t) {
- state = reducer(state, t);
- observer.next(state);
- },
- error: (e) => observer.error(e),
- complete: () => observer.complete(),
- })
- }
- }
- function take(observable, n) {
- return function(observer) {
- const unsubscribe = observable({
- next(t) {
- if(n--) {
- observer.next(t);
- } else {
- observer.complete();
- unsubscribe();
- }
- },
- error: (e) => observer.error(e),
- complete: () => observer.complete(),
- })
- return unsubscribe;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement