// Limit the rate of flow to an event every 250ms: // const stream$ = other$.thru(limitFlow(250)); export function limitFlow(period) { return function limitFlow(stream) { const source = new RateLimitSource(stream.source, period); return new stream.constructor(source); }; } class RateLimitSource { constructor(source, period) { this.source = source; this.period = period; } run(sink, scheduler) { return new RateLimitSink(this, sink, scheduler); } } class RateLimitSink { constructor(source, sink, scheduler) { this.source = source; this.sink = sink; this.scheduler = scheduler; this.next = 0; this.buffered = void 0; } _run(t) { if(this.buffered === void 0) { return; } const x = this.buffered; const now = this.scheduler.now(); const period = this.source.period; const next = this.next; this.buffered = void 0; this.next = (next + period > now ? next : now) + period; this.sink.event(t, x); } event(t, x) { const nothingScheduled = this.buffered === void 0; this.buffered = x; const task = new RateLimitTask(this.sink); const next = this.next; if(t >= next) { this.scheduler.asap(task); } else if(nothingScheduled) { this.scheduler.delay(this.next, new RateLimitTask(this.sink, x)); } } end(t, x) { this._run(t); this.sink.end(t, x); } error(t, e) { this.sink.error(t, e); } } class RateLimitTask { constructor(sink) { this.sink = sink; } run(t) { if(this.disposed) { return; } this.sink._run(t); } error(t, e) { if(this.disposed) { return; } this.sink.error(t, e); } dispose() { this.disposed = true; } }