Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Threading;
- using System.Windows.Threading;
- namespace TestingConsoleApplication.Tests.ThreadedQueue
- {
- class ThreadedQueueTest
- {
- public ThreadedQueueTest()
- {
- Debug.WriteLine("App Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- TaskQueue TaskQueue = new TaskQueue();
- QueueManager QueueManager = new QueueManager(TaskQueue);
- // inter thread exchanger object
- InterThreadExchanger Exchanger = new InterThreadExchanger();
- Exchanger.TaskQueue = TaskQueue;
- // create queue listening thread
- Thread ProcessingThread = new Thread(new Processor().ProcessingThreadBody);
- ProcessingThread.Start(Exchanger);
- // Create an inferred delegate that invokes methods for the timer.
- TimerCallback tcb = QueueManager.AddTask;
- Timer stateTimer = new Timer(tcb, null, 1000, 1000);
- Console.ReadLine();
- stateTimer.Change(0, 0);
- Console.WriteLine("Final task queue status: " + TaskQueue.Count.ToString());
- Debug.WriteLine("App Thread ID on end: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- Console.ReadLine();
- }
- }
- /// <summary>
- /// Processing thread working object
- /// </summary>
- class Processor
- {
- private InterThreadExchanger Exchanger;
- private Dispatcher ProcessorDispatcher;
- /// <summary>
- /// Processing thread main method
- /// </summary>
- public void ProcessingThreadBody(object exchanger)
- {
- ProcessorDispatcher = Dispatcher.CurrentDispatcher;
- Exchanger = (InterThreadExchanger)exchanger;
- Exchanger.TaskQueue.Filled += new TaskQueue.TaskQueueEventHandler(OnTaskQueueFilled);
- Debug.WriteLine("Processor Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString());
- Debug.WriteLine("Processor Dispatcher Thread ID: " + ProcessorDispatcher.Thread.ManagedThreadId.ToString());
- Debug.Assert(ProcessorDispatcher.Thread.ManagedThreadId == System.Threading.Thread.CurrentThread.ManagedThreadId, "Processor thread ID and its dispatcher thread ID are not equal!");
- }
- /// <summary>
- /// Queue filled event listener
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="args"></param>
- private void OnTaskQueueFilled(object sender, EventArgs args)
- {
- Debug.WriteLine("Processor QueueListener caller Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- Debug.WriteLine("Processor Dispatcher Thread ID: " + ProcessorDispatcher.Thread.ManagedThreadId.ToString());
- if (ProcessorDispatcher.CheckAccess())
- {
- Debug.WriteLine("Processor calling ProcessQueue.");
- ProcessQueue();
- }
- else
- {
- Debug.WriteLine("Processor invoking ProcessQueue.");
- ProcessorDispatcher.BeginInvoke(new Action(ProcessQueue), new object[] {null});
- Debug.WriteLine("Processor invoked ProcessQueue.");
- }
- }
- private void ProcessQueue()
- {
- Debug.WriteLine("ProcessQueue Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- foreach (string Task in Exchanger.TaskQueue)
- {
- Console.WriteLine("Task in queue: " + Task);
- // todo: process task
- lock (Exchanger.TaskQueue)
- {
- Exchanger.TaskQueue.Dequeue();
- }
- }
- }
- }
- /// <summary>
- /// Interthread communication object
- /// </summary>
- class InterThreadExchanger
- {
- private ConnectionStates ConnectionStateStatus = ConnectionStates.Ready;
- public enum ConnectionStates
- {
- Ready,
- Broken
- }
- public TaskQueue TaskQueue { get; set; }
- public ConnectionStates ConnectionState
- {
- get
- {
- return ConnectionStateStatus;
- }
- set
- {
- if (ConnectionStateStatus != value)
- {
- ConnectionStateStatus = value;
- RaiseConnectionStateChangeEvent();
- }
- }
- }
- public delegate void ConnectionStateChangeEventHandler(object sender, EventArgs args);
- public event ConnectionStateChangeEventHandler ConnectionStateChange;
- private void RaiseConnectionStateChangeEvent()
- {
- if (ConnectionStateChange != null)
- {
- ConnectionStateChange(this, new EventArgs());
- }
- }
- }
- class QueueManager
- {
- private TaskQueue TaskQueue;
- public QueueManager(TaskQueue taskQueue)
- {
- TaskQueue = taskQueue;
- Console.WriteLine("Thread id (StatusChecker): " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- }
- // This method is called by the timer delegate.
- public void AddTask(Object stateInfo)
- {
- Console.WriteLine("Thread id (Timer elapsed): " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- lock (TaskQueue)
- {
- TaskQueue.Enqueue("New task name");
- }
- Console.WriteLine("Task count: " + TaskQueue.Count.ToString());
- }
- }
- class TaskQueue : Queue<string>
- {
- private Object QueueLock = new Object();
- public TaskQueue()
- {
- Console.WriteLine("Thread id (TaskQueue):" + System.Threading.Thread.CurrentThread.ManagedThreadId);
- }
- public new void Enqueue(string item)
- {
- // if task is already queued, skip adding
- if (!Contains(item))
- {
- Console.WriteLine("Adding taks: " + item);
- lock (QueueLock) // thread safety
- {
- base.Enqueue(item);
- }
- RaiseEvent();
- }
- else
- {
- Console.WriteLine("Item already queued: " + item);
- }
- }
- private void RaiseEvent()
- {
- if (Filled != null)
- {
- Filled(this, new EventArgs());
- }
- }
- public delegate void TaskQueueEventHandler(object sender, EventArgs args);
- public event TaskQueueEventHandler Filled;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement