Advertisement
daegron

ChatGPT Attempt

Mar 22nd, 2023
616
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 5.08 KB | None | 0 0
  1. Can you fix my ThreadPool implementation. Keep in mind, I can't use system libraries like ThreadPool or Task. There is something wrong, may be you can even improve it?
  2. namespace MyThreadPool;
  3. namespace MyThreadPool;
  4.  
  5. using System.Collections.Concurrent;
  6.  
  7. /// <summary>
  8. /// ThreadPool implementation
  9. /// </summary>
  10. public class MyThreadPool
  11. {
  12.    private readonly List<Thread> threads;
  13.    private readonly BlockingCollection<Action> tasksQueue;
  14.    private readonly ManualResetEvent queueBlocker;
  15.    private readonly CancellationTokenSource cancellationTokenFactory;
  16.    private volatile bool isTimeToRetire;
  17.  
  18.    public MyThreadPool(int threadCount)
  19.    {
  20.        if (threadCount < 1)
  21.        {
  22.            throw new ArgumentException("The number of threads must be greater than zero.", nameof(threadCount));
  23.        }
  24.        
  25.        threads = new List<Thread>(threadCount);
  26.        tasksQueue = new BlockingCollection<Action>();
  27.        queueBlocker = new ManualResetEvent(false);
  28.        cancellationTokenFactory = new CancellationTokenSource();
  29.        
  30.        for (var i = 0; i < threadCount; i++)
  31.        {
  32.            var thread = new Thread(ThreadProcessing)
  33.            {
  34.                IsBackground = true
  35.            };
  36.            
  37.            threads.Add(thread);
  38.            thread.Start();
  39.        }
  40.    }
  41.  
  42.    /// <summary>
  43.    /// Add task to thread pool
  44.    /// </summary>
  45.    public IMyTask<T> Submit<T>(Func<T> function)
  46.    {
  47.        lock (cancellationTokenFactory)
  48.        {
  49.            if (cancellationTokenFactory.IsCancellationRequested)
  50.            {
  51.                throw new InvalidOperationException();
  52.            }
  53.  
  54.            var myTask = new MyTask<T>(function, this);
  55.            tasksQueue.Add(myTask.Start);
  56.            queueBlocker.Set();
  57.            return myTask;
  58.        }
  59.    }
  60.  
  61.    /// <summary>
  62.    /// Thread processing of tasksQueue
  63.    /// </summary>
  64.    private void ThreadProcessing()
  65.    {
  66.        while (!cancellationTokenFactory.IsCancellationRequested || !isTimeToRetire)
  67.        {
  68.            Action? task;
  69.            while (!tasksQueue.TryTake(out task) && !cancellationTokenFactory.IsCancellationRequested)
  70.            {
  71.                queueBlocker.Reset();
  72.                queueBlocker.WaitOne();
  73.            }
  74.  
  75.            task?.Invoke();
  76.        }
  77.    }
  78.  
  79.    /// <summary>
  80.    /// Waits for all currently working threads and doesn't allow new tasks to start execution
  81.     /// </summary>
  82.     public void Shutdown()
  83.     {
  84.         cancellationTokenFactory.Cancel();
  85.         foreach (var thread in threads)
  86.         {
  87.             thread.Join();
  88.         }
  89.     }
  90.  
  91.     private class MyTask<TResult> : IMyTask<TResult>
  92.     {
  93.         private readonly ConcurrentQueue<Action> continuationQueue;
  94.         private readonly MyThreadPool threadPool;
  95.         private AggregateException? exception;
  96.         private bool isCompleted;  
  97.         private Func<TResult>? function;
  98.         private TResult result;
  99.  
  100.         /// <inheritdoc/>
  101.         public bool IsCompleted { get; }
  102.  
  103.         public MyTask(Func<TResult> function, MyThreadPool attached)
  104.         {
  105.             threadPool = attached;
  106.             this.function = function ?? throw new ArgumentNullException(nameof(function));;
  107.             continuationQueue = new ConcurrentQueue<Action>();
  108.         }
  109.        
  110.         /// <inheritdoc/>
  111.         public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> continuation)
  112.         {
  113.             lock (threadPool.cancellationTokenFactory)
  114.             {
  115.                 if (threadPool.cancellationTokenFactory.IsCancellationRequested)
  116.                 {
  117.                     throw new InvalidOperationException();
  118.                 }
  119.  
  120.                 if (IsCompleted)
  121.                 {
  122.                     return threadPool.Submit(() => continuation(result));
  123.                 }
  124.  
  125.                 var taskOfContinuation = new MyTask<TNewResult>(() => continuation(Result), threadPool);
  126.                 continuationQueue.Enqueue(taskOfContinuation.Start);
  127.                 return taskOfContinuation;
  128.             }
  129.         }
  130.  
  131.         /// <inheritdoc/>
  132.         public TResult Result
  133.         {
  134.             get
  135.             {
  136.                 SpinWait.SpinUntil(() => isCompleted);
  137.                 if (exception != null)
  138.                 {
  139.                     throw exception;
  140.                 }
  141.  
  142.                 return result;
  143.             }
  144.         }
  145.        
  146.         /// <summary>
  147.         /// Starts task
  148.         /// </summary>
  149.         public void Start()
  150.         {
  151.             try
  152.             {
  153.                 result = function!();
  154.                 isCompleted = true;
  155.                 function = null;
  156.             }
  157.             catch (Exception e)
  158.             {
  159.                 isCompleted = true;
  160.                 exception = new AggregateException(e);
  161.             }
  162.  
  163.             foreach (var continuation in continuationQueue)
  164.             {
  165.                 threadPool.tasksQueue.Add(continuation);
  166.                 threadPool.queueBlocker.Set();
  167.             }
  168.         }
  169.     }
  170. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement