Advertisement
wartab

PromiseRateLimiter

Jun 1st, 2018
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. export class PromiseRateLimiter<T = any> {
  2.  
  3.     private callbacks: (() => Promise<T>)[] = [];
  4.     private callbackResults: T[] = [];
  5.  
  6.     private nextIndex: number = 0;
  7.     private endPromise: Promise<any> = null;
  8.     private launched: boolean = false;
  9.     private aborted: boolean = false;
  10.     private resolveCallback: (value?: (PromiseLike<T[]> | T[])) => void;
  11.     private resolvedPromises: number = 0;
  12.     private rejectCallback: (reason?: any) => void;
  13.     private startedPromises: number = 0;
  14.  
  15.     public constructor(private maxConcurrentPromises: number) {}
  16.  
  17.     public addCallback(callback: () => Promise<T>) {
  18.         if (this.launched) {
  19.             throw new Error("Cannot add callback after launching rate limiter");
  20.         }
  21.         this.callbacks.push(callback);
  22.     }
  23.  
  24.     public launch(): Promise<T[]> {
  25.         this.launched = true;
  26.  
  27.         this.endPromise = new Promise<T[]>((resolve, reject) => {
  28.             this.resolveCallback = resolve;
  29.             this.rejectCallback = reject;
  30.  
  31.             if (this.callbacks.length === 0) {
  32.                 setTimeout(() => this.aborted ? reject([]) : resolve([]), 0);
  33.             }
  34.         });
  35.  
  36.         let min = this.maxConcurrentPromises;
  37.         let length = this.callbacks.length;
  38.         if (min > length) {
  39.             min = length;
  40.         }
  41.  
  42.         for (let i = 0; i < min; ++i) {
  43.             this.startNext();
  44.         }
  45.  
  46.         return this.endPromise;
  47.     }
  48.  
  49.     public abort() {
  50.         this.aborted = true;
  51.         this.rejectCallback(this.callbackResults);
  52.     }
  53.  
  54.     private startNext() {
  55.         if (this.aborted) {
  56.             return;
  57.         }
  58.         ++this.startedPromises;
  59.         let index = this.nextIndex;
  60.         let callback = this.callbacks[this.nextIndex++];
  61.         let promise = callback();
  62.         promise.then(result => {
  63.             if (this.aborted) {
  64.                 return;
  65.             }
  66.             this.callbackResults[index] = result;
  67.             ++this.resolvedPromises;
  68.             if (this.resolvedPromises === this.callbacks.length) {
  69.                 this.resolveCallback(this.callbackResults);
  70.             } else if (this.startedPromises < this.callbacks.length) {
  71.                 this.startNext();
  72.             }
  73.         });
  74.     }
  75.  
  76.     public getEndPromise(): Promise<T[]> {
  77.         return this.endPromise;
  78.     }
  79. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement