Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Limit the rate of flow to an event every 250ms:
- // const stream$ = other$.thru(limitFlow(250));
- export function limitFlow(period) {
- return function limitFlow(stream) {
- const source = new RateLimitSource(stream.source, period);
- return new stream.constructor(source);
- };
- }
- class RateLimitSource
- {
- constructor(source, period) {
- this.source = source;
- this.period = period;
- }
- run(sink, scheduler) {
- return new RateLimitSink(this, sink, scheduler);
- }
- }
- class RateLimitSink
- {
- constructor(source, sink, scheduler) {
- this.source = source;
- this.sink = sink;
- this.scheduler = scheduler;
- this.next = 0;
- this.buffered = void 0;
- }
- _run(t) {
- if(this.buffered === void 0) {
- return;
- }
- const x = this.buffered;
- const now = this.scheduler.now();
- const period = this.source.period;
- const next = this.next;
- this.buffered = void 0;
- this.next = (next + period > now ? next : now) + period;
- this.sink.event(t, x);
- }
- event(t, x) {
- const nothingScheduled = this.buffered === void 0;
- this.buffered = x;
- const task = new RateLimitTask(this.sink);
- const next = this.next;
- if(t >= next) {
- this.scheduler.asap(task);
- }
- else if(nothingScheduled) {
- this.scheduler.delay(this.next, new RateLimitTask(this.sink, x));
- }
- }
- end(t, x) {
- this._run(t);
- this.sink.end(t, x);
- }
- error(t, e) {
- this.sink.error(t, e);
- }
- }
- class RateLimitTask
- {
- constructor(sink) {
- this.sink = sink;
- }
- run(t) {
- if(this.disposed) {
- return;
- }
- this.sink._run(t);
- }
- error(t, e) {
- if(this.disposed) {
- return;
- }
- this.sink.error(t, e);
- }
- dispose() {
- this.disposed = true;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement