Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Concurrent;
- using System.Threading;
- namespace ProducerConsumerQueueTest
- {
- public sealed class EventArgs<T> : EventArgs
- {
- public EventArgs(T item)
- {
- Item = item;
- }
- public T Item { get; private set; }
- }
- public sealed class ProducerConsumerQueue<T> : IDisposable
- {
- public event EventHandler<EventArgs<T>> Dequeued;
- public event EventHandler<EventArgs<string>> Logged;
- readonly BlockingCollection<T> _items;
- readonly Action<T> _onDequeued; // Making these member fields
- readonly AsyncCallback _onDequeuedCallback; // so that new delegate objects
- // aren't created on every Begin-/
- // EndInvoke call (below) --
- // really a small optimization,
- // but presumably performance is
- // a priority for this class.
- readonly Action<string> _onLogged; // Same reasoning as above.
- readonly AsyncCallback _onLoggedCallback;
- readonly Thread[] _threads;
- readonly CancellationTokenSource _cancellationTokenSource;
- public ProducerConsumerQueue(int threadCount)
- {
- if (threadCount < 1)
- {
- throw new ArgumentOutOfRangeException("threadCount");
- }
- _items = new BlockingCollection<T>();
- _onDequeued = OnDequeuedInner;
- _onDequeuedCallback = _onDequeued.EndInvoke;
- _onLogged = OnLoggedInner;
- _onLoggedCallback = _onLogged.EndInvoke;
- _threads = new Thread[threadCount];
- _cancellationTokenSource = new CancellationTokenSource();
- for (int i = 0; i < threadCount; ++i)
- {
- _threads[i] = new Thread(Work);
- _threads[i].IsBackground = true;
- _threads[i].Start();
- }
- }
- public void Enqueue(T item)
- {
- if (!_cancellationTokenSource.IsCancellationRequested)
- {
- _items.Add(item);
- }
- }
- public void Dispose()
- {
- _cancellationTokenSource.Cancel();
- for (int i = 0; i < _threads.Length; ++i)
- {
- _threads[i].Join();
- }
- _cancellationTokenSource.Dispose();
- }
- void Work()
- {
- while (!_cancellationTokenSource.IsCancellationRequested)
- {
- try
- {
- T item = _items.Take(_cancellationTokenSource.Token);
- OnDequeued(item);
- }
- catch (OperationCanceledException ex)
- {
- OnLogged("ProducerConsumerQueue was disposed.");
- }
- }
- }
- void OnDequeued(T item)
- {
- // It's important to raise events on separate threads, the reason being
- // that if external code attaches a buggy event handler that could
- // throw an exception or (worse) deadlock, it would otherwise break
- // the queue by either terminating or freezing one of the consumer
- // threads.
- _onDequeued.BeginInvoke(item, _onDequeuedCallback, null);
- }
- void OnDequeuedInner(T item)
- {
- EventHandler<EventArgs<T>> handler = Dequeued;
- if (handler != null)
- {
- handler(this, new EventArgs<T>(item));
- }
- }
- void OnLogged(string message)
- {
- // Again, it's important to raise events on separate threads (see
- // explanation above).
- _onLogged.BeginInvoke(message, _onLoggedCallback, null);
- }
- void OnLoggedInner(string message)
- {
- EventHandler<EventArgs<string>> handler = Logged;
- if (handler != null)
- {
- handler(this, new EventArgs<string>(message));
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement