Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- namespace threads
- {
- class Program
- {
- const int threadCount = 8;
- static Random random = new Random();
- static void Main(string[] args)
- {
- // Вначале без всяких выдумок заполняем список контроллеров потоков.
- var pool = new List<ThreadController>();
- for (int i = 0; i < threadCount; i++)
- {
- var controller = new ThreadController();
- // По ходу создаём потоки и стартуем их присвоив имя.
- var thread = controller.CreateThread(start: false);
- thread.Name = $"Thread {i + 1}";
- thread.Start();
- pool.Add(controller);
- }
- Console.WriteLine("Commands: Stop, Counter, stopWatch");
- while (true)
- {
- // Перед следующей командой ждём когда все потоки завершат свои задания.
- pool.WaitAll();
- var info = $"[{pool.Count, 3} threads]";
- Console.Write(info + "> ");
- var input = Console.ReadLine();
- if (input == "s") // Стопаем все потоки по запросу.
- {
- pool.StopWait();
- }
- else if (input == "c") // Заставляет все потоки инкрементировать один счётчик.
- //Странно, но я ни разу не получал ошибку этого счётчика.
- {
- var counter = (value: (double)0, false);
- pool.SetJob(() => Console.WriteLine($"{counter.value++} - {Thread.CurrentThread.Name}")).WaitAll();
- Console.WriteLine($"Result is {counter.value}");
- }
- else if (input == "w") // Рандомное ожидание и общее время, прошедшее в главном потоке.
- {
- var t = new System.Diagnostics.Stopwatch();
- t.Start();
- pool.SetJob(RandomWait).WaitAll();
- t.Stop();
- Console.WriteLine($"Total: {t.ElapsedMilliseconds}");
- }
- else if (input == "t") // Как предыдущее, только показывает прикольную цепочку назначения заданий.
- {
- var t = new System.Diagnostics.Stopwatch();
- t.Start();
- pool.SetJob(RandomWait).SetJob(RandomWait).SetJob(RandomWait).WaitAll();
- t.Stop();
- Console.WriteLine($"Total: {t.ElapsedMilliseconds}");
- }
- else // Тупо напечатать ввод пользователя.
- {
- pool.SetJob(() => { Console.WriteLine($"{input} - {Thread.CurrentThread.Name}"); });
- }
- }
- }
- /// <summary>
- /// Приостанавливает исполнение потока на случайное время до 1 секунды.
- /// </summary>
- static void RandomWait()
- {
- var amount = random.Next(30, 1000);
- Thread.Sleep(amount);
- Console.WriteLine($"{Thread.CurrentThread.Name, -10} slept for {amount, 4} milliseconds.");
- }
- }
- /// <summary>
- /// Описывает возможные состояния потока/контроллера потока
- /// </summary>
- enum ThreadWorkState : byte
- {
- /// <summary>
- /// Поток ожидает задачи
- /// </summary>
- Waiting,
- /// <summary>
- /// Поток обрабатывает задачу
- /// </summary>
- Working,
- /// <summary>
- /// Поток останавливается
- /// </summary>
- Terminating
- }
- /// <summary>
- /// Управляет исполнением потока. Определяет точку входа для новых потоков, в которой ожидает заданий.
- /// </summary>
- class ThreadController
- {
- private Thread associatedThread;
- private Action job;
- /// <summary>
- /// Текущее состояние потока
- /// </summary>
- public ThreadWorkState State { get; set; }
- /// <summary>
- /// Задача, исполняемая потоком. <code>null</code> если текущей задачи нет.
- /// </summary>
- public Action Job { get => job; set => SetJob(value); }
- /// <summary>
- /// Имя контролируемого потока.
- /// </summary>
- public string ThreadName => Thread.CurrentThread.Name ?? "Unnamed thread";
- public ThreadController()
- {
- State = ThreadWorkState.Waiting;
- }
- /// <summary>
- /// Точка входа для нового потока. Назначается при создании экземпляра Thread.
- /// Не следует вызывать этот метод напрямую из главного потока.
- /// </summary>
- public void Main()
- {
- Console.WriteLine($"{ThreadName} spawned.");
- while (true)
- {
- SpinWait();
- if (State == ThreadWorkState.Working && Job != null)
- {
- Job();
- job = null;
- State = ThreadWorkState.Waiting;
- }
- else if (State == ThreadWorkState.Terminating)
- {
- break;
- }
- }
- }
- void SpinWait()
- {
- Thread.SpinWait(2 << 10);
- }
- /// <summary>
- /// Выводит поток из цикла ожидания задачи. Если задачи нет, поток останавливается и закрывается.
- /// </summary>
- public void StopWait()
- {
- State = ThreadWorkState.Terminating;
- Console.WriteLine($"{ThreadName} terminating.");
- }
- private void SetJob(Action job)
- {
- if (State == ThreadWorkState.Waiting)
- {
- this.job = job;
- State = ThreadWorkState.Working;
- }
- else
- {
- throw new InvalidOperationException("Поток не находился в ожидании задачи.");
- }
- }
- /// <summary>
- /// Создаёт новый поток с Main в качестве точки входа.
- /// </summary>
- /// <param name="start">Показывает, нужно ли начать исполнение потока сразу при создании</param>
- /// <returns>Возвращает новый поток, управляемый текущим экземпляром ThreadController</returns>
- public Thread CreateThread(bool start = true)
- {
- if (associatedThread != null)
- throw new InvalidOperationException("Этот контроллер уже явно связан с потоком");
- var thread = new Thread(Main);
- if (start)
- thread.Start();
- associatedThread = thread;
- return thread;
- }
- }
- /// <summary>
- /// Определяет набор методов расширений для коллекций ThreadController, упрощающих управление.
- /// </summary>
- static class ThreadControllersExtensions
- {
- /// <summary>
- /// В цикле ожидает, когда все потоки перейдут в состояние ожидания следующей задачи.
- /// </summary>
- /// <param name="pool"></param>
- public static ICollection<ThreadController> WaitAll(this ICollection<ThreadController> pool)
- {
- while (true)
- {
- if (pool.All(c => c.State == ThreadWorkState.Waiting))
- {
- break;
- }
- Thread.SpinWait(2 << 10);
- }
- return pool;
- }
- /// <summary>
- /// Устанавливает задачу <paramref name="job"/> для всех потоков из пула.
- /// Если существуют потоки, у которых уже есть задача - сначала завершится выполнение этой задачи.
- /// </summary>
- public static ICollection<ThreadController> SetJob(this ICollection<ThreadController> pool, Action job)
- {
- var flaggedPool = pool.WithFlags();
- while (flaggedPool.Any(x => !x.flag))
- {
- foreach (var item in flaggedPool.Where(x => !x.flag && x.controller.State == ThreadWorkState.Waiting))
- {
- item.controller.Job = job;
- item.flag = true;
- }
- Thread.SpinWait(2 << 10);
- }
- return pool;
- }
- /// <summary>
- /// Выводит все потоки из цикла ожидания работы и удаляет все контроллеры потоков из пула, оставляя пустую коллекцию.
- /// </summary>
- public static void StopWait(this ICollection<ThreadController> pool)
- {
- foreach (var controller in pool)
- {
- controller.StopWait();
- }
- while (pool.Count > 0)
- {
- pool.Remove(pool.First());
- }
- }
- static ICollection<ControllerWithFlag> WithFlags(this ICollection<ThreadController> pool) => pool.Select(x => new ControllerWithFlag(x)).ToList();
- /// <summary>
- /// Маленький служебный класс, нужен чтобы не засорять ThreadController мутабельным флагом.
- /// </summary>
- class ControllerWithFlag
- {
- internal ThreadController controller;
- internal bool flag;
- internal ControllerWithFlag(ThreadController controller)
- {
- this.controller = controller;
- flag = false;
- }
- public static implicit operator ThreadController(ControllerWithFlag value)
- {
- return value.controller;
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement