Advertisement
Guest User

Untitled

a guest
May 30th, 2016
46
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.71 KB | None | 0 0
  1. // Limit the rate of flow to an event every 250ms:
  2. // const stream$ = other$.thru(limitFlow(250));
  3.  
  4. export function limitFlow(period) {
  5. return function limitFlow(stream) {
  6. const source = new RateLimitSource(stream.source, period);
  7. return new stream.constructor(source);
  8. };
  9. }
  10.  
  11. class RateLimitSource
  12. {
  13. constructor(source, period) {
  14. this.source = source;
  15. this.period = period;
  16. }
  17.  
  18. run(sink, scheduler) {
  19. return new RateLimitSink(this, sink, scheduler);
  20. }
  21. }
  22.  
  23. class RateLimitSink
  24. {
  25. constructor(source, sink, scheduler) {
  26. this.source = source;
  27. this.sink = sink;
  28. this.scheduler = scheduler;
  29. this.next = 0;
  30. this.buffered = void 0;
  31. }
  32.  
  33. _run(t) {
  34. if(this.buffered === void 0) {
  35. return;
  36. }
  37. const x = this.buffered;
  38. const now = this.scheduler.now();
  39. const period = this.source.period;
  40. const next = this.next;
  41. this.buffered = void 0;
  42. this.next = (next + period > now ? next : now) + period;
  43. this.sink.event(t, x);
  44. }
  45.  
  46. event(t, x) {
  47. const nothingScheduled = this.buffered === void 0;
  48. this.buffered = x;
  49. const task = new RateLimitTask(this.sink);
  50. const next = this.next;
  51. if(t >= next) {
  52. this.scheduler.asap(task);
  53. }
  54. else if(nothingScheduled) {
  55. this.scheduler.delay(this.next, new RateLimitTask(this.sink, x));
  56. }
  57. }
  58.  
  59. end(t, x) {
  60. this._run(t);
  61. this.sink.end(t, x);
  62. }
  63.  
  64. error(t, e) {
  65. this.sink.error(t, e);
  66. }
  67. }
  68.  
  69. class RateLimitTask
  70. {
  71. constructor(sink) {
  72. this.sink = sink;
  73. }
  74.  
  75. run(t) {
  76. if(this.disposed) {
  77. return;
  78. }
  79. this.sink._run(t);
  80. }
  81.  
  82. error(t, e) {
  83. if(this.disposed) {
  84. return;
  85. }
  86. this.sink.error(t, e);
  87. }
  88.  
  89. dispose() {
  90. this.disposed = true;
  91. }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement