Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Threading;
- using System.Windows.Threading;
- namespace TestingConsoleApplication.Tests.ThreadedQueue
- {
- class ThreadedQueueTest
- {
- public ThreadedQueueTest()
- {
- Debug.WriteLine("App Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- TaskQueue taskQueue = new TaskQueue();
- QueueManager queueManager = new QueueManager(taskQueue);
- // inter thread exchanger object
- InterThreadExchanger exchanger = new InterThreadExchanger();
- exchanger.TaskQueue = taskQueue;
- // create queue listening thread
- Thread processingThread = new Thread(new Processor().ProcessingThreadBody);
- processingThread.Start(exchanger);
- // Create an inferred delegate that invokes methods for the timer.
- TimerCallback tcb = queueManager.AddTask;
- Timer stateTimer = new Timer(tcb, null, 1000, 1000);
- Console.ReadLine();
- stateTimer.Change(0, 0);
- Console.WriteLine("Final task queue status: " + taskQueue.Count.ToString());
- Debug.WriteLine("App Thread ID on end: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- Console.ReadLine();
- }
- }
- /// <summary>
- /// Processing thread working object
- /// </summary>
- class Processor
- {
- InterThreadExchanger exchanger;
- Dispatcher processorDispatcher;
- /// <summary>
- /// Processing thread main method
- /// </summary>
- public void ProcessingThreadBody(object interThreadExchanger)
- {
- processorDispatcher = Dispatcher.CurrentDispatcher;
- exchanger = (InterThreadExchanger)interThreadExchanger;
- exchanger.TaskQueue.Filled += new TaskQueue.TaskQueueFilledEventHandler(OnTaskQueueFilled);
- Debug.WriteLine("Processor Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString());
- Debug.WriteLine("Processor Dispatcher Thread ID: " + processorDispatcher.Thread.ManagedThreadId.ToString());
- Debug.Assert(processorDispatcher.Thread.ManagedThreadId == System.Threading.Thread.CurrentThread.ManagedThreadId, "Processor thread ID and its dispatcher thread ID are not equal!");
- }
- /// <summary>
- /// Queue filled event listener
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="args"></param>
- void OnTaskQueueFilled(object sender, EventArgs args)
- {
- Debug.WriteLine("Processor QueueListener caller Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- Debug.WriteLine("Processor Dispatcher Thread ID: " + processorDispatcher.Thread.ManagedThreadId.ToString());
- if (processorDispatcher.CheckAccess())
- {
- Debug.WriteLine("Processor calling ProcessQueue.");
- ProcessQueue();
- }
- else
- {
- Debug.WriteLine("Processor invoking ProcessQueue.");
- processorDispatcher.BeginInvoke(new Action(ProcessQueue), new object[] {null});
- Debug.WriteLine("Processor invoked ProcessQueue.");
- }
- }
- void ProcessQueue()
- {
- Debug.WriteLine("ProcessQueue Thread ID: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- foreach (string Task in exchanger.TaskQueue)
- {
- Console.WriteLine("Task in queue: " + Task);
- // todo: process task
- lock (exchanger.TaskQueue)
- {
- exchanger.TaskQueue.Dequeue();
- }
- }
- }
- }
- /// <summary>
- /// Interthread communication object
- /// </summary>
- class InterThreadExchanger
- {
- ConnectionState connectionState = ConnectionState.Ready;
- public enum ConnectionState
- {
- Ready,
- Broken
- }
- public TaskQueue TaskQueue { get; set; }
- public ConnectionState DbConnectionState
- {
- get
- {
- return connectionState;
- }
- set
- {
- if (connectionState != value)
- {
- connectionState = value;
- RaiseConnectionStateChangeEvent();
- }
- }
- }
- public delegate void ConnectionStateChangeEventHandler(object sender, EventArgs args);
- public event ConnectionStateChangeEventHandler ConnectionStateChange;
- void RaiseConnectionStateChangeEvent()
- {
- if (ConnectionStateChange != null)
- {
- ConnectionStateChange(this, new EventArgs());
- }
- }
- }
- class QueueManager
- {
- TaskQueue taskQueue;
- public QueueManager(TaskQueue taskQueue)
- {
- this.taskQueue = taskQueue;
- Console.WriteLine("Thread id (StatusChecker): " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- }
- // This method is called by the timer delegate.
- public void AddTask(Object stateInfo)
- {
- Console.WriteLine("Thread id (Timer elapsed): " + System.Threading.Thread.CurrentThread.ManagedThreadId);
- lock (taskQueue)
- {
- taskQueue.Enqueue("New task name");
- }
- Console.WriteLine("Task count: " + taskQueue.Count.ToString());
- }
- }
- class TaskQueue : Queue<string>
- {
- Object queueLock = new Object();
- public TaskQueue()
- {
- Console.WriteLine("Thread id (TaskQueue):" + System.Threading.Thread.CurrentThread.ManagedThreadId);
- }
- public new void Enqueue(string item)
- {
- // if task is already queued, skip adding
- if (!Contains(item))
- {
- Console.WriteLine("Adding taks: " + item);
- lock (queueLock) // thread safety
- {
- base.Enqueue(item);
- }
- RaiseEvent();
- }
- else
- {
- Console.WriteLine("Item already queued: " + item);
- }
- }
- void RaiseEvent()
- {
- if (Filled != null)
- {
- Filled(this, new EventArgs());
- }
- }
- public delegate void TaskQueueFilledEventHandler(object sender, EventArgs args);
- public event TaskQueueFilledEventHandler Filled;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement