Advertisement
Guest User

Untitled

a guest
Jan 16th, 2017
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.12 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5.  
  6. namespace AsyncScheduler
  7. {
  8. public class ActionQueueScheduler : IDisposable
  9. {
  10. private readonly ConcurrentQueue<Func<Task>> queue;
  11.  
  12. private readonly ConcurrentQueue<Func<Task>> immediate;
  13.  
  14. private readonly CancellationTokenSource canceller;
  15.  
  16. private int state; // [ 0: stopped, 1: setup, 2: running ]
  17.  
  18. private Task runner;
  19.  
  20. public ActionQueueScheduler()
  21. {
  22. this.queue = new ConcurrentQueue<Func<Task>>();
  23. this.immediate = new ConcurrentQueue<Func<Task>>();
  24. this.canceller = new CancellationTokenSource();
  25. }
  26.  
  27. public Task Add(Action<CancellationToken> action)
  28. => this.Add(token => { action(token); return default(object); });
  29.  
  30. public Task<T> Add<T>(Func<CancellationToken, T> action)
  31. {
  32. if (this.canceller.IsCancellationRequested)
  33. return Task.FromCanceled<T>(this.canceller.Token);
  34. var tcs = new TaskCompletionSource<T>();
  35. this.queue.Enqueue(() => this.InvokeAction(action, tcs));
  36. this.Run();
  37. return tcs.Task;
  38. }
  39.  
  40. public Task Add(Func<CancellationToken, Task> action)
  41. => this.Add(async token => { await action(token); return default(object); });
  42.  
  43. public Task<T> Add<T>(Func<CancellationToken, Task<T>> action)
  44. {
  45. if (this.canceller.IsCancellationRequested)
  46. return Task.FromCanceled<T>(this.canceller.Token);
  47. var tcs = new TaskCompletionSource<T>();
  48. this.queue.Enqueue(() => this.InvokeActionAsync(action, tcs));
  49. this.Run();
  50. return tcs.Task;
  51. }
  52.  
  53. public Task Interrupt(Action<CancellationToken> action)
  54. => this.Interrupt(token => { action(token); return default(object); });
  55.  
  56. public Task<T> Interrupt<T>(Func<CancellationToken, T> action)
  57. {
  58. if (this.canceller.IsCancellationRequested)
  59. return Task.FromCanceled<T>(this.canceller.Token);
  60. var tcs = new TaskCompletionSource<T>();
  61. this.immediate.Enqueue(() => this.InvokeAction(action, tcs));
  62. this.Run();
  63. return tcs.Task;
  64. }
  65.  
  66. public Task Interrupt(Func<CancellationToken, Task> action)
  67. => this.Interrupt(async token => { await action(token); return default(object); });
  68.  
  69. public Task<T> Interrupt<T>(Func<CancellationToken, Task<T>> action)
  70. {
  71. if (this.canceller.IsCancellationRequested)
  72. return Task.FromCanceled<T>(this.canceller.Token);
  73. var tcs = new TaskCompletionSource<T>();
  74. this.immediate.Enqueue(() => this.InvokeActionAsync(action, tcs));
  75. this.Run();
  76. return tcs.Task;
  77. }
  78.  
  79. private Task InvokeAction<T>(Func<CancellationToken, T> action, TaskCompletionSource<T> tcs)
  80. {
  81. try
  82. {
  83. this.canceller.Token.ThrowIfCancellationRequested();
  84. tcs.TrySetResult(action(this.canceller.Token));
  85. }
  86. catch (OperationCanceledException)
  87. {
  88. tcs.TrySetCanceled(this.canceller.Token);
  89. }
  90. catch (Exception exception)
  91. {
  92. tcs.TrySetException(exception);
  93. }
  94. return Task.FromResult(default(object));
  95. }
  96.  
  97. private async Task InvokeActionAsync<T>(Func<CancellationToken, Task<T>> asyncAction, TaskCompletionSource<T> tcs)
  98. {
  99. try
  100. {
  101. this.canceller.Token.ThrowIfCancellationRequested();
  102. tcs.TrySetResult(await asyncAction(this.canceller.Token).ConfigureAwait(false));
  103. }
  104. catch (OperationCanceledException)
  105. {
  106. System.Console.WriteLine("Cancelled");
  107. tcs.TrySetCanceled(this.canceller.Token);
  108. }
  109. catch (Exception exception)
  110. {
  111. tcs.TrySetException(exception);
  112. }
  113. }
  114.  
  115. private Task Run()
  116. {
  117. if (this.state == 2)
  118. return this.runner;
  119. if (Interlocked.CompareExchange(ref this.state, 1, 0) != 0)
  120. return this.Run();
  121. return this.runner = Task.Run(async () =>
  122. {
  123. Interlocked.Exchange(ref this.state, 2);
  124. var action = default(Func<Task>);
  125. while (true)
  126. {
  127. if (this.immediate.TryDequeue(out action))
  128. await action().ConfigureAwait(false);
  129. else if (this.queue.TryDequeue(out action))
  130. await action().ConfigureAwait(false);
  131. else
  132. break;
  133. }
  134. Interlocked.Exchange(ref this.state, 0);
  135. if (this.queue.Count != 0)
  136. await this.Run().ConfigureAwait(false);
  137. });
  138. }
  139.  
  140. public void Dispose()
  141. {
  142. this.canceller.Cancel();
  143. this.runner?.Wait();
  144. }
  145. }
  146. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement