Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class FuturePool
- {
- private class Item
- {
- public Type request_type; // response type
- public Type response_type; // request type
- public System.Delegate operation; // signature
- public object deferred; // deferred
- public object request; // request
- }
- private Queue<Item> queue;
- private int concurrency;
- private int running;
- public FuturePool(int concurrency)
- {
- this.queue = new System.Collections.Generic.Queue<Item>();
- this.concurrency = concurrency;
- this.running = 0;
- }
- public Future<TResponse> Queue<TRequest, TResponse>(Func<TRequest, Future<TResponse>> operation, TRequest request)
- {
- var deferred = new Deferred<TResponse>();
- //---------------------------
- // queue up the work
- //---------------------------
- this.queue.Enqueue(new Item {
- request_type = typeof(TRequest),
- response_type = typeof(TResponse),
- deferred = deferred,
- request = request,
- operation = operation
- });
- this.Process();
- return deferred.Future;
- }
- private void Process()
- {
- if ((this.queue.Count > 0) && (this.running < this.concurrency)) {
- //-------------------------------
- // increment
- //-------------------------------
- this.running += 1;
- //-------------------------------
- // dequeue and inspect
- //-------------------------------
- var item = this.queue.Dequeue();
- var future = item.operation.DynamicInvoke(new object[] { item.request });
- var future_type = future.GetType();
- var deffered = item.deferred;
- var deffered_type = deffered.GetType();
- //-------------------------------
- // handle the resolve
- //-------------------------------
- var d_then = DynamicAction.Create(new Type[] { item.response_type }, (results) => {
- deffered_type.GetMethod("Resolve").Invoke(deffered, results);
- this.running -= 1;
- if (this.queue.Count > 0) this.Process();
- });
- //-------------------------------
- // handle the reject
- //-------------------------------
- var d_error = DynamicAction.Create(new Type[] { typeof(System.Exception) }, (results) => {
- deffered_type.GetMethod("Reject").Invoke(deffered, results);
- this.running -= 1;
- if (this.queue.Count > 0) this.Process();
- });
- //-------------------------------
- // execute
- //-------------------------------
- future_type.GetMethod("Then").Invoke (future, new object[] { d_then });
- future_type.GetMethod("Error").Invoke (future, new object[] { d_error });
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement