Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Can you fix my ThreadPool implementation. Keep in mind, I can't use system libraries like ThreadPool or Task. There is something wrong, may be you can even improve it?
- namespace MyThreadPool;
- namespace MyThreadPool;
- using System.Collections.Concurrent;
- /// <summary>
- /// ThreadPool implementation
- /// </summary>
- public class MyThreadPool
- {
- private readonly List<Thread> threads;
- private readonly BlockingCollection<Action> tasksQueue;
- private readonly ManualResetEvent queueBlocker;
- private readonly CancellationTokenSource cancellationTokenFactory;
- private volatile bool isTimeToRetire;
- public MyThreadPool(int threadCount)
- {
- if (threadCount < 1)
- {
- throw new ArgumentException("The number of threads must be greater than zero.", nameof(threadCount));
- }
- threads = new List<Thread>(threadCount);
- tasksQueue = new BlockingCollection<Action>();
- queueBlocker = new ManualResetEvent(false);
- cancellationTokenFactory = new CancellationTokenSource();
- for (var i = 0; i < threadCount; i++)
- {
- var thread = new Thread(ThreadProcessing)
- {
- IsBackground = true
- };
- threads.Add(thread);
- thread.Start();
- }
- }
- /// <summary>
- /// Add task to thread pool
- /// </summary>
- public IMyTask<T> Submit<T>(Func<T> function)
- {
- lock (cancellationTokenFactory)
- {
- if (cancellationTokenFactory.IsCancellationRequested)
- {
- throw new InvalidOperationException();
- }
- var myTask = new MyTask<T>(function, this);
- tasksQueue.Add(myTask.Start);
- queueBlocker.Set();
- return myTask;
- }
- }
- /// <summary>
- /// Thread processing of tasksQueue
- /// </summary>
- private void ThreadProcessing()
- {
- while (!cancellationTokenFactory.IsCancellationRequested || !isTimeToRetire)
- {
- Action? task;
- while (!tasksQueue.TryTake(out task) && !cancellationTokenFactory.IsCancellationRequested)
- {
- queueBlocker.Reset();
- queueBlocker.WaitOne();
- }
- task?.Invoke();
- }
- }
- /// <summary>
- /// Waits for all currently working threads and doesn't allow new tasks to start execution
- /// </summary>
- public void Shutdown()
- {
- cancellationTokenFactory.Cancel();
- foreach (var thread in threads)
- {
- thread.Join();
- }
- }
- private class MyTask<TResult> : IMyTask<TResult>
- {
- private readonly ConcurrentQueue<Action> continuationQueue;
- private readonly MyThreadPool threadPool;
- private AggregateException? exception;
- private bool isCompleted;
- private Func<TResult>? function;
- private TResult result;
- /// <inheritdoc/>
- public bool IsCompleted { get; }
- public MyTask(Func<TResult> function, MyThreadPool attached)
- {
- threadPool = attached;
- this.function = function ?? throw new ArgumentNullException(nameof(function));;
- continuationQueue = new ConcurrentQueue<Action>();
- }
- /// <inheritdoc/>
- public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> continuation)
- {
- lock (threadPool.cancellationTokenFactory)
- {
- if (threadPool.cancellationTokenFactory.IsCancellationRequested)
- {
- throw new InvalidOperationException();
- }
- if (IsCompleted)
- {
- return threadPool.Submit(() => continuation(result));
- }
- var taskOfContinuation = new MyTask<TNewResult>(() => continuation(Result), threadPool);
- continuationQueue.Enqueue(taskOfContinuation.Start);
- return taskOfContinuation;
- }
- }
- /// <inheritdoc/>
- public TResult Result
- {
- get
- {
- SpinWait.SpinUntil(() => isCompleted);
- if (exception != null)
- {
- throw exception;
- }
- return result;
- }
- }
- /// <summary>
- /// Starts task
- /// </summary>
- public void Start()
- {
- try
- {
- result = function!();
- isCompleted = true;
- function = null;
- }
- catch (Exception e)
- {
- isCompleted = true;
- exception = new AggregateException(e);
- }
- foreach (var continuation in continuationQueue)
- {
- threadPool.tasksQueue.Add(continuation);
- threadPool.queueBlocker.Set();
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement