Advertisement
Guest User

Untitled

a guest
Jan 19th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.10 KB | None | 0 0
  1. // limits a stream to `limit` active events
  2. function limitStream(input$, outputComplete$, limit = 10) {
  3. const initialOutputLimiter$ = Rx.Observable.range(1, limit).map(x => true);
  4.  
  5. // a stream of bools indicating when to pass the next input to the output stream
  6. const outputLimiter$ = Rx.Observable.concat(initialOutputLimiter$, outputComplete$);
  7.  
  8. return Rx.Observable.zip(outputLimiter$, input$);
  9. }
  10.  
  11. // Creates a responses proxy that we can pass to limitStream.
  12. // This enables our circular dependency of passing the completed responses to
  13. // the request limiter
  14. const responsesProxy$ = new Rx.Subject();
  15.  
  16. // Creates the limited stream of requests
  17. const limitedRequests$ = limitStream(requests$, responsesProxy$, 10);
  18.  
  19. // We need to track each request's completion. Usually we would map limitedRequests$
  20. // to an XHR call and end up with a responses stream but for this example We simply
  21. // map requests to `true`
  22. const responses$ = limitedRequests$.map(x => true);
  23.  
  24. // Now we pipe our responses to the responsesProxy that limitedRequests$ uses to
  25. // release another request
  26. responses$.subscribe(responsesProxy$);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement