Advertisement
Guest User

Untitled

a guest
Feb 26th, 2020
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import { Resolver, defer } from './defer'
  2.  
  3. export type Enqueue<T> = (value: T) => void
  4.  
  5. export type Dequeue<T> = () => Promise<T>
  6.  
  7. /**
  8.  * An asynchronous queue that allows values to be
  9.  * asynchronously pulled from the queue. Returns
  10.  * a [enqueue, dequeue] tuple queuing and de-queuing
  11.  * values.
  12.  */
  13. export function queue<T>(): [Enqueue<T>, Dequeue<T>] {
  14.  
  15.     const promises: Promise<T>[]  = []    
  16.     const awaiters: Resolver<T>[] = []
  17.  
  18.     function dequeue(): Promise<T> {
  19.  
  20.         // If we have a promise, it means that
  21.         // the sender has enqueued a value and
  22.         // its available immediately. Shift the
  23.         // promise immediately
  24.         if(promises.length > 0) {
  25.  
  26.             return promises.shift()!
  27.  
  28.         }
  29.  
  30.         // If there are no promises available,
  31.         // create a deferred and push an awaiter.
  32.         // Return promise to caller.
  33.         else {
  34.  
  35.             const [promise, awaiter] = defer<T>()
  36.  
  37.             awaiters.push(awaiter)
  38.  
  39.             return promise
  40.         }
  41.     }
  42.  
  43.     function enqueue(item: T) {
  44.  
  45.         // If there is an awaiter, it means we have
  46.         // a receiver awaiting on a value. Shift
  47.         // the awaiter and resolve immediately.
  48.         if(awaiters.length > 0) {
  49.  
  50.             const awaiter = awaiters.shift()!
  51.  
  52.             awaiter(item)
  53.  
  54.         }
  55.  
  56.         // If there is no awaiters, it means that
  57.         // there is no receiver trying to receive
  58.         // at this time. create a deferred and push
  59.         // the promise for the 'next' receiver to
  60.         // receive. Resolve awaiter immediately
  61.         else {
  62.  
  63.             const [promise, awaiter] = defer<T>()
  64.            
  65.             awaiter(item)
  66.  
  67.             promises.push(promise)
  68.         }
  69.     }
  70.  
  71.     return [
  72.        
  73.         value => enqueue(value),
  74.  
  75.         ()    => dequeue()
  76.     ]
  77. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement