Advertisement
Guest User

Untitled

a guest
Mar 21st, 2018
129
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. using System;
  2. using System.Linq;
  3. using System.Threading.Tasks;
  4. using System.Threading.Tasks.Dataflow;
  5. using NLog;
  6.  
  7. namespace Starter
  8. {
  9.     internal class StackoverFlowTest
  10.     {
  11.         private static readonly Logger Log = LogManager.GetCurrentClassLogger();
  12.         private TransformBlock<Data, Tuple<Data, MetaData>> _ioBlock;
  13.         private TransformBlock<Tuple<Data, MetaData>, Data> _waitBlock;
  14.         private static readonly SecureRandom Random = new SecureRandom();
  15.      
  16.         public void Start()
  17.         {
  18.             _ioBlock = new TransformBlock<Data, Tuple<Data, MetaData>>(async data =>
  19.             {
  20.                 Log.Debug($"Enter I/O block {data.I}...");
  21.  
  22.                 var metaData = await ReadAsync(data).ConfigureAwait(false);
  23.  
  24.                 var dataMetaData =  new Tuple<Data, MetaData>(data, metaData);
  25.  
  26.                 Log.Debug($"Exit I/O block {data.I}...");
  27.  
  28.                 return dataMetaData;
  29.  
  30.             }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 100});
  31.  
  32.             _waitBlock = new TransformBlock<Tuple<Data, MetaData>, Data>(async dataMetaData =>
  33.             {
  34.                 Log.Debug($"Enter wait block {dataMetaData.Item1.I}...");
  35.  
  36.                 var data = dataMetaData.Item1;
  37.                 var metaData = dataMetaData.Item2;
  38.  
  39.                 if (!metaData.Repost)
  40.                 {
  41.                     return null;
  42.                 }
  43.  
  44.                 await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
  45.  
  46.                 Log.Debug($"Exit wait block {data.I}...");
  47.  
  48.                 return data;
  49.  
  50.             }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});
  51.  
  52.             _ioBlock.LinkTo(_waitBlock);
  53.             _waitBlock.LinkTo(_ioBlock, data => data != null);
  54.             _waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());
  55.  
  56.             foreach (var data in Enumerable.Range(0, 10000).Select(i => new Data(i)))
  57.             {
  58.                 _ioBlock.Post(data);
  59.             }
  60.         }
  61.  
  62.         private static async Task<MetaData> ReadAsync(Data data)
  63.         {
  64.             var milliseconds = Random.Next(1, 56789);
  65.  
  66.             await Task.Delay(milliseconds).ConfigureAwait(false);// Some I/O using 'data'...
  67.  
  68.             var repost = Random.Next(0, 10) < 3;// Some fake result of I/O task
  69.  
  70.             return new MetaData {Repost = repost};
  71.         }
  72.     }
  73.  
  74.     internal class MetaData
  75.     {
  76.         public bool Repost { get; set; }
  77.     }
  78.  
  79.     internal class Data
  80.     {
  81.         public int I { get; }
  82.  
  83.         public Data(int i)
  84.         {
  85.             I = i;
  86.         }
  87.     }
  88. }
Advertisement
RAW Paste Data Copied
Advertisement