Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static IEnumerable<T> PreCache<T>(this IEnumerable<T> source, int count = 1)
- {
- var e = source.GetEnumerator();
- var queue = new BufferBlock<T>();
- ActionBlock<bool> worker = null;
- Func<bool> tryQueue = () =>
- e.ConcurrentMoveNext(
- value => queue.Post(value),
- () => worker.Complete());
- worker = new ActionBlock<bool>(synchronousFill =>
- { while (queue.Count < count && tryQueue() && synchronousFill); },
- // The consumers will dictate the amount of parallelism.
- new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 32 });
- worker.PropagateCompletionTo(queue);
- while (true)
- {
- // Is something already availaible in the queue? Get it.
- T item;
- if (queue.TryReceive(null, out item))
- {
- worker.SendAsync(true);
- yield return item;
- }
- else
- {
- // At this point, something could be in the queue again, but let's assume not and try an trigger more.
- if (worker.Post(true))
- {
- // The .Post call is 'accepted' (doesn't mean it was run).
- // Setup the wait for recieve the next avaialable.
- var task = queue.ReceiveAsync();
- task.Wait();
- if(task.IsFaulted)
- {
- throw task.Exception.InnerException;
- }
- if(!task.IsCanceled) // Cancelled means there's nothing to get.
- {
- // Task was not cancelled and there is a result availaible;
- yield return task.Result;
- continue;
- }
- }
- yield break;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement