Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // limits a stream to `limit` active events
- function limitStream(input$, outputComplete$, limit = 10) {
- const initialOutputLimiter$ = Rx.Observable.range(1, limit).map(x => true);
- // a stream of bools indicating when to pass the next input to the output stream
- const outputLimiter$ = Rx.Observable.concat(initialOutputLimiter$, outputComplete$);
- return Rx.Observable.zip(outputLimiter$, input$);
- }
- // Creates a responses proxy that we can pass to limitStream.
- // This enables our circular dependency of passing the completed responses to
- // the request limiter
- const responsesProxy$ = new Rx.Subject();
- // Creates the limited stream of requests
- const limitedRequests$ = limitStream(requests$, responsesProxy$, 10);
- // We need to track each request's completion. Usually we would map limitedRequests$
- // to an XHR call and end up with a responses stream but for this example We simply
- // map requests to `true`
- const responses$ = limitedRequests$.map(x => true);
- // Now we pipe our responses to the responsesProxy that limitedRequests$ uses to
- // release another request
- responses$.subscribe(responsesProxy$);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement