Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Concurrent;
- using System.Threading;
- using System.Threading.Tasks;
- namespace AsyncScheduler
- {
- public class ActionQueueScheduler : IDisposable
- {
- private readonly ConcurrentQueue<Func<Task>> queue;
- private readonly ConcurrentQueue<Func<Task>> immediate;
- private readonly CancellationTokenSource canceller;
- private int state; // [ 0: stopped, 1: setup, 2: running ]
- private Task runner;
- public ActionQueueScheduler()
- {
- this.queue = new ConcurrentQueue<Func<Task>>();
- this.immediate = new ConcurrentQueue<Func<Task>>();
- this.canceller = new CancellationTokenSource();
- }
- public Task Add(Action<CancellationToken> action)
- => this.Add(token => { action(token); return default(object); });
- public Task<T> Add<T>(Func<CancellationToken, T> action)
- {
- if (this.canceller.IsCancellationRequested)
- return Task.FromCanceled<T>(this.canceller.Token);
- var tcs = new TaskCompletionSource<T>();
- this.queue.Enqueue(() => this.InvokeAction(action, tcs));
- this.Run();
- return tcs.Task;
- }
- public Task Add(Func<CancellationToken, Task> action)
- => this.Add(async token => { await action(token); return default(object); });
- public Task<T> Add<T>(Func<CancellationToken, Task<T>> action)
- {
- if (this.canceller.IsCancellationRequested)
- return Task.FromCanceled<T>(this.canceller.Token);
- var tcs = new TaskCompletionSource<T>();
- this.queue.Enqueue(() => this.InvokeActionAsync(action, tcs));
- this.Run();
- return tcs.Task;
- }
- public Task Interrupt(Action<CancellationToken> action)
- => this.Interrupt(token => { action(token); return default(object); });
- public Task<T> Interrupt<T>(Func<CancellationToken, T> action)
- {
- if (this.canceller.IsCancellationRequested)
- return Task.FromCanceled<T>(this.canceller.Token);
- var tcs = new TaskCompletionSource<T>();
- this.immediate.Enqueue(() => this.InvokeAction(action, tcs));
- this.Run();
- return tcs.Task;
- }
- public Task Interrupt(Func<CancellationToken, Task> action)
- => this.Interrupt(async token => { await action(token); return default(object); });
- public Task<T> Interrupt<T>(Func<CancellationToken, Task<T>> action)
- {
- if (this.canceller.IsCancellationRequested)
- return Task.FromCanceled<T>(this.canceller.Token);
- var tcs = new TaskCompletionSource<T>();
- this.immediate.Enqueue(() => this.InvokeActionAsync(action, tcs));
- this.Run();
- return tcs.Task;
- }
- private Task InvokeAction<T>(Func<CancellationToken, T> action, TaskCompletionSource<T> tcs)
- {
- try
- {
- this.canceller.Token.ThrowIfCancellationRequested();
- tcs.TrySetResult(action(this.canceller.Token));
- }
- catch (OperationCanceledException)
- {
- tcs.TrySetCanceled(this.canceller.Token);
- }
- catch (Exception exception)
- {
- tcs.TrySetException(exception);
- }
- return Task.FromResult(default(object));
- }
- private async Task InvokeActionAsync<T>(Func<CancellationToken, Task<T>> asyncAction, TaskCompletionSource<T> tcs)
- {
- try
- {
- this.canceller.Token.ThrowIfCancellationRequested();
- tcs.TrySetResult(await asyncAction(this.canceller.Token).ConfigureAwait(false));
- }
- catch (OperationCanceledException)
- {
- System.Console.WriteLine("Cancelled");
- tcs.TrySetCanceled(this.canceller.Token);
- }
- catch (Exception exception)
- {
- tcs.TrySetException(exception);
- }
- }
- private Task Run()
- {
- if (this.state == 2)
- return this.runner;
- if (Interlocked.CompareExchange(ref this.state, 1, 0) != 0)
- return this.Run();
- return this.runner = Task.Run(async () =>
- {
- Interlocked.Exchange(ref this.state, 2);
- var action = default(Func<Task>);
- while (true)
- {
- if (this.immediate.TryDequeue(out action))
- await action().ConfigureAwait(false);
- else if (this.queue.TryDequeue(out action))
- await action().ConfigureAwait(false);
- else
- break;
- }
- Interlocked.Exchange(ref this.state, 0);
- if (this.queue.Count != 0)
- await this.Run().ConfigureAwait(false);
- });
- }
- public void Dispose()
- {
- this.canceller.Cancel();
- this.runner?.Wait();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement