namespace syncronization { class ecount { private long m_count = 0; private long m_waiters = 0; private static long InterlockedOr(ref long dest, long value) { long cmp, cmptmp = dest; do { cmp = cmptmp; cmptmp = System.Threading.Interlocked.CompareExchange(ref dest, cmp | value, cmp); } while (cmp != cmptmp); return cmp; } private void prv_signal(long cmptmp, bool broadcast) { if ((cmptmp & 1) != 0) { lock (this) { long cmp; do { cmp = cmptmp; long newval = cmp + 2; if (m_waiters < 2 || broadcast) { newval &= ~1; } cmptmp = System.Threading.Interlocked.CompareExchange(ref m_count, newval, cmp); } while (cmp != cmptmp); if (m_waiters > 0) { if (m_waiters == 1 || ! broadcast) { --m_waiters; System.Threading.Monitor.Pulse(this); } else { m_waiters = 0; System.Threading.Monitor.PulseAll(this); } } } } } public long get() { return InterlockedOr(ref m_count, 1); } public void signal() { long cmp = System.Threading.Thread.VolatileRead(ref m_count); System.Threading.Thread.MemoryBarrier(); prv_signal(cmp, false); } public void signal_relaxed() { long cmp = System.Threading.Thread.VolatileRead(ref m_count); prv_signal(cmp, false); } public void broadcast() { long cmp = System.Threading.Thread.VolatileRead(ref m_count); System.Threading.Thread.MemoryBarrier(); prv_signal(cmp, true); } public void broadcast_relaxed() { long cmp = System.Threading.Thread.VolatileRead(ref m_count); prv_signal(cmp, true); } public void wait(long cmp1) { lock (this) { long cmp2 = System.Threading.Thread.VolatileRead(ref m_count) & ~1; if ((cmp1 & ~1) == cmp2) { ++m_waiters; System.Threading.Monitor.Wait(this); } } } }; class mpmcq { public class node { public volatile node m_next = null; public T m_state = default(T); public node() {} public node(T state) { m_state = state; } }; volatile node m_head; volatile node m_tail; public mpmcq(node n) { n.m_next = null; m_head = m_tail = n; } public mpmcq() : this(new node()) {} public void push(T state) { node n = new node(state); n.m_next = null; node prev = System.Threading.Interlocked.Exchange(ref m_head, n); prev.m_next = n; } public bool pop(out T state) { node cmp, cmptmp = m_tail; do { cmp = cmptmp; node next = cmp.m_next; if (next == null) { state = default(T); return false; } state = next.m_state; cmptmp = System.Threading.Interlocked.CompareExchange(ref m_tail, next, cmp); } while (cmp != cmptmp); return true; } }; class producer { ecount m_ecount; mpmcq m_queue; int m_max; int m_id; public producer(int id, ecount ecount, mpmcq queue, int max) { m_ecount = ecount; m_queue = queue; m_max = max; m_id = id; } public void run() { System.Console.WriteLine("Producer " + m_id + ", Running!"); for (int i = 0; i < m_max; ++i) { m_queue.push("Producer " + m_id + ", Object " + i); m_ecount.signal(); System.Threading.Thread.Sleep(1); } System.Console.WriteLine("Producer " + m_id + ", Finished!"); } }; class consumer { ecount m_ecount; mpmcq m_queue; int m_id; public static long g_mcount = 0; public static long g_ccount = 0; public consumer(int id, ecount ecount, mpmcq queue) { m_ecount = ecount; m_queue = queue; m_id = id; } public void run() { System.Console.WriteLine("Consumer " + m_id + ", Running!"); string s; do { while (! m_queue.pop(out s)) { long key = m_ecount.get(); if (m_queue.pop(out s)) break; m_ecount.wait(key); } System.Console.WriteLine("Consumer " + m_id + ", Received Message: " + s); } while (System.Threading.Interlocked.Decrement(ref g_mcount) > 0); for (int i = 1; i < g_ccount; ++i) { m_queue.push("Consumer " + m_id + ", SHUTDOWN " + i); } m_ecount.broadcast(); System.Console.WriteLine("Consumer " + m_id + ", Finished!"); } }; class application { ecount m_ecount = new ecount(); mpmcq m_queue = new mpmcq(); int m_pcount, m_ccount, m_mcount; producer[] m_producers; consumer[] m_consumers; System.Threading.Thread[] m_threads; void prv_get_params() { get_producer_count: try { System.Console.Write("Number of producers: "); m_pcount = System.Int32.Parse(System.Console.ReadLine()); if (m_pcount < 1) { System.Console.WriteLine("Sorry, you need at least one producer!"); System.Media.SystemSounds.Exclamation.Play(); goto get_producer_count; } get_consumer_count: System.Console.Write("Number of consumers: "); m_ccount = System.Int32.Parse(System.Console.ReadLine()); if (m_ccount < 1) { System.Console.WriteLine("Sorry, you need at least one consumer!"); System.Media.SystemSounds.Exclamation.Play(); goto get_consumer_count; } get_message_count: System.Console.Write("Number of messages per-producer: "); m_mcount = System.Int32.Parse(System.Console.ReadLine()); if (m_mcount < 1) { System.Console.WriteLine("Sorry, you need at least one message per-producer!"); System.Media.SystemSounds.Exclamation.Play(); goto get_message_count; } } catch { System.Console.WriteLine("Bad input; try again!"); System.Media.SystemSounds.Exclamation.Play(); goto get_producer_count; } consumer.g_mcount = m_pcount * m_mcount; consumer.g_ccount = m_ccount; System.Console.Clear(); } void prv_create_threads() { m_producers = new producer[m_pcount]; m_consumers = new consumer[m_ccount]; m_threads = new System.Threading.Thread[m_pcount + m_ccount]; for (int i = 0; i < m_pcount; ++i) { m_producers[i] = new producer(i, m_ecount, m_queue, m_mcount); } for (int i = 0; i < m_ccount; ++i) { m_consumers[i] = new consumer(i, m_ecount, m_queue); } for (int i = 0; i < m_ccount; ++i) { m_threads[i] = new System.Threading.Thread(m_consumers[i].run); } for (int i = 0; i < m_pcount; ++i) { m_threads[i + m_ccount] = new System.Threading.Thread(m_producers[i].run); } } void prv_start_threads() { for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Start(); } void prv_join_threads() { for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Join(); } public application() { prv_get_params(); prv_create_threads(); } public void go() { prv_start_threads(); prv_join_threads(); } static void Main(string[] args) { application app = new application(); app.go(); System.Console.WriteLine("\n\nPress to exit..."); System.Console.ReadLine(); } }; }