Advertisement
Guest User

Untitled

a guest
Jan 23rd, 2017
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.76 KB | None | 0 0
  1. {
  2.  
  3.  
  4. /*
  5. interface Observer<T> {
  6. next(t: T): void;
  7. error(e: Error): void
  8. complete(): void;
  9. }
  10. */
  11. class Observer {
  12. constructor(next, error, complete) {
  13. this.next = next;
  14. this.error = error;
  15. this.complete = complete;
  16. }
  17. }
  18.  
  19. const incrementer = scanObservable(createIntervalObservable(100), function(s, item) {
  20. return s + 1;
  21. }, 0);
  22.  
  23. const squaredIncrementer = mapObservable(incrementer, x => x * x);
  24.  
  25.  
  26. const firstTenSquares = take(squaredIncrementer, 10);
  27.  
  28. // {{ square$ | async }}
  29. firstTenSquares({
  30. next(n) {
  31. console.log("heard", n);
  32. },
  33. complete() {
  34. console.log("finished");
  35. },
  36. })
  37.  
  38.  
  39.  
  40. // type CancelSubscription = () => void;
  41. // type FunctionObservable<T> = (observer: Observer<T>) => CancelSubscription
  42.  
  43. function createIntervalObservable(nMilliseconds) {
  44. return function(observer) {
  45.  
  46. let intervalToken = setInterval(() => observer.next(), nMilliseconds);
  47.  
  48. // cancellation
  49. return () => {
  50. if(intervalToken) {
  51. clearInterval(intervalToken)
  52. intervalToken = null;
  53. }
  54. }
  55. }
  56. }
  57.  
  58. // mapObservable(o: FunctionObservable<T>, mapper: (t: T) => U): FunctionObservable<U>
  59. function mapObservable(observable, mapper) {
  60. return function(observer) {
  61. return observable({
  62. next(t) {
  63. const u = mapper(t);
  64. observer.next(u);
  65. },
  66. error: (e) => observer.error(e),
  67. complete: () => observer.complete(),
  68. })
  69. }
  70. }
  71.  
  72. // scanObservable<S,T>(o: FunctionObservable<T>, reducer: (s: S, t: T) => S): FunctionObservable<S>
  73. function scanObservable(observable, reducer, state) {
  74. return function(observer) {
  75. return observable({
  76. next(t) {
  77. state = reducer(state, t);
  78. observer.next(state);
  79. },
  80. error: (e) => observer.error(e),
  81. complete: () => observer.complete(),
  82. })
  83. }
  84. }
  85.  
  86. function take(observable, n) {
  87. return function(observer) {
  88. const unsubscribe = observable({
  89. next(t) {
  90. if(n--) {
  91. observer.next(t);
  92. } else {
  93. observer.complete();
  94. unsubscribe();
  95. }
  96. },
  97. error: (e) => observer.error(e),
  98. complete: () => observer.complete(),
  99. })
  100.  
  101. return unsubscribe;
  102. }
  103. }
  104.  
  105.  
  106.  
  107.  
  108.  
  109.  
  110.  
  111.  
  112.  
  113.  
  114.  
  115.  
  116.  
  117.  
  118.  
  119.  
  120.  
  121.  
  122.  
  123.  
  124.  
  125. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement