Advertisement
Guest User

ProducerConsumerQueue

a guest
Oct 9th, 2010
411
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.29 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4.  
  5. namespace ProducerConsumerQueueTest
  6. {
  7.     public sealed class EventArgs<T> : EventArgs
  8.     {
  9.         public EventArgs(T item)
  10.         {
  11.             Item = item;
  12.         }
  13.  
  14.         public T Item { get; private set; }
  15.     }
  16.  
  17.     public sealed class ProducerConsumerQueue<T> : IDisposable
  18.     {
  19.         public event EventHandler<EventArgs<T>> Dequeued;
  20.         public event EventHandler<EventArgs<string>> Logged;
  21.  
  22.         readonly BlockingCollection<T> _items;
  23.  
  24.         readonly Action<T> _onDequeued;             // Making these member fields
  25.         readonly AsyncCallback _onDequeuedCallback; // so that new delegate objects
  26.                                                     // aren't created on every Begin-/
  27.                                                     // EndInvoke call (below) --
  28.                                                     // really a small optimization,
  29.                                                     // but presumably performance is
  30.                                                     // a priority for this class.
  31.  
  32.         readonly Action<string> _onLogged;          // Same reasoning as above.
  33.         readonly AsyncCallback _onLoggedCallback;
  34.  
  35.         readonly Thread[] _threads;
  36.         readonly CancellationTokenSource _cancellationTokenSource;
  37.  
  38.         public ProducerConsumerQueue(int threadCount)
  39.         {
  40.             if (threadCount < 1)
  41.             {
  42.                 throw new ArgumentOutOfRangeException("threadCount");
  43.             }
  44.  
  45.             _items = new BlockingCollection<T>();
  46.  
  47.             _onDequeued = OnDequeuedInner;
  48.             _onDequeuedCallback = _onDequeued.EndInvoke;
  49.             _onLogged = OnLoggedInner;
  50.             _onLoggedCallback = _onLogged.EndInvoke;
  51.  
  52.             _threads = new Thread[threadCount];
  53.             _cancellationTokenSource = new CancellationTokenSource();
  54.  
  55.             for (int i = 0; i < threadCount; ++i)
  56.             {
  57.                 _threads[i] = new Thread(Work);
  58.                 _threads[i].IsBackground = true;
  59.                 _threads[i].Start();
  60.             }
  61.         }
  62.  
  63.         public void Enqueue(T item)
  64.         {
  65.             if (!_cancellationTokenSource.IsCancellationRequested)
  66.             {
  67.                 _items.Add(item);
  68.             }
  69.         }
  70.  
  71.         public void Dispose()
  72.         {
  73.             _cancellationTokenSource.Cancel();
  74.  
  75.             for (int i = 0; i < _threads.Length; ++i)
  76.             {
  77.                 _threads[i].Join();
  78.             }
  79.  
  80.             _cancellationTokenSource.Dispose();
  81.         }
  82.  
  83.         void Work()
  84.         {
  85.             while (!_cancellationTokenSource.IsCancellationRequested)
  86.             {
  87.                 try
  88.                 {
  89.                     T item = _items.Take(_cancellationTokenSource.Token);
  90.                     OnDequeued(item);
  91.                 }
  92.                 catch (OperationCanceledException ex)
  93.                 {
  94.                     OnLogged("ProducerConsumerQueue was disposed.");
  95.                 }
  96.             }
  97.         }
  98.  
  99.         void OnDequeued(T item)
  100.         {
  101.             // It's important to raise events on separate threads, the reason being
  102.             // that if external code attaches a buggy event handler that could
  103.             // throw an exception or (worse) deadlock, it would otherwise break
  104.             // the queue by either terminating or freezing one of the consumer
  105.             // threads.
  106.             _onDequeued.BeginInvoke(item, _onDequeuedCallback, null);
  107.         }
  108.  
  109.         void OnDequeuedInner(T item)
  110.         {
  111.             EventHandler<EventArgs<T>> handler = Dequeued;
  112.  
  113.             if (handler != null)
  114.             {
  115.                 handler(this, new EventArgs<T>(item));
  116.             }
  117.         }
  118.  
  119.         void OnLogged(string message)
  120.         {
  121.             // Again, it's important to raise events on separate threads (see
  122.             // explanation above).
  123.             _onLogged.BeginInvoke(message, _onLoggedCallback, null);
  124.         }
  125.  
  126.         void OnLoggedInner(string message)
  127.         {
  128.             EventHandler<EventArgs<string>> handler = Logged;
  129.  
  130.             if (handler != null)
  131.             {
  132.                 handler(this, new EventArgs<string>(message));
  133.             }
  134.         }
  135.     }
  136. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement