Advertisement
Guest User

Untitled

a guest
Jan 12th, 2017
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.46 KB | None | 0 0
  1. public static IEnumerable<T> PreCache<T>(this IEnumerable<T> source, int count = 1)
  2. {
  3. var e = source.GetEnumerator();
  4. var queue = new BufferBlock<T>();
  5. ActionBlock<bool> worker = null;
  6.  
  7. Func<bool> tryQueue = () =>
  8. e.ConcurrentMoveNext(
  9. value => queue.Post(value),
  10. () => worker.Complete());
  11.  
  12. worker = new ActionBlock<bool>(synchronousFill =>
  13. { while (queue.Count < count && tryQueue() && synchronousFill); },
  14. // The consumers will dictate the amount of parallelism.
  15. new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 32 });
  16.  
  17. worker.PropagateCompletionTo(queue);
  18.  
  19. while (true)
  20. {
  21. // Is something already availaible in the queue? Get it.
  22. T item;
  23. if (queue.TryReceive(null, out item))
  24. {
  25. worker.SendAsync(true);
  26. yield return item;
  27. }
  28. else
  29. {
  30. // At this point, something could be in the queue again, but let's assume not and try an trigger more.
  31. if (worker.Post(true))
  32. {
  33. // The .Post call is 'accepted' (doesn't mean it was run).
  34. // Setup the wait for recieve the next avaialable.
  35. var task = queue.ReceiveAsync();
  36. task.Wait();
  37. if(task.IsFaulted)
  38. {
  39. throw task.Exception.InnerException;
  40. }
  41. if(!task.IsCanceled) // Cancelled means there's nothing to get.
  42. {
  43. // Task was not cancelled and there is a result availaible;
  44. yield return task.Result;
  45. continue;
  46. }
  47. }
  48.  
  49. yield break;
  50. }
  51. }
  52. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement