Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- namespace syncronization {
- class ecount {
- private long m_count = 0;
- private long m_waiters = 0;
- private static long InterlockedOr(ref long dest, long value) {
- long cmp, cmptmp = dest;
- do {
- cmp = cmptmp;
- cmptmp = System.Threading.Interlocked.CompareExchange(ref dest, cmp | value, cmp);
- } while (cmp != cmptmp);
- return cmp;
- }
- private void prv_signal(long cmptmp, bool broadcast) {
- if ((cmptmp & 1) != 0) {
- lock (this) {
- long cmp;
- do {
- cmp = cmptmp;
- long newval = cmp + 2;
- if (m_waiters < 2 || broadcast) {
- newval &= ~1;
- }
- cmptmp = System.Threading.Interlocked.CompareExchange(ref m_count, newval, cmp);
- } while (cmp != cmptmp);
- if (m_waiters > 0) {
- if (m_waiters == 1 || ! broadcast) {
- --m_waiters;
- System.Threading.Monitor.Pulse(this);
- } else {
- m_waiters = 0;
- System.Threading.Monitor.PulseAll(this);
- }
- }
- }
- }
- }
- public long get() {
- return InterlockedOr(ref m_count, 1);
- }
- public void signal() {
- long cmp = System.Threading.Thread.VolatileRead(ref m_count);
- System.Threading.Thread.MemoryBarrier();
- prv_signal(cmp, false);
- }
- public void signal_relaxed() {
- long cmp = System.Threading.Thread.VolatileRead(ref m_count);
- prv_signal(cmp, false);
- }
- public void broadcast() {
- long cmp = System.Threading.Thread.VolatileRead(ref m_count);
- System.Threading.Thread.MemoryBarrier();
- prv_signal(cmp, true);
- }
- public void broadcast_relaxed() {
- long cmp = System.Threading.Thread.VolatileRead(ref m_count);
- prv_signal(cmp, true);
- }
- public void wait(long cmp1) {
- lock (this) {
- long cmp2 = System.Threading.Thread.VolatileRead(ref m_count) & ~1;
- if ((cmp1 & ~1) == cmp2) {
- ++m_waiters;
- System.Threading.Monitor.Wait(this);
- }
- }
- }
- };
- class mpmcq<T> {
- public class node {
- public volatile node m_next = null;
- public T m_state = default(T);
- public node() {}
- public node(T state) {
- m_state = state;
- }
- };
- volatile node m_head;
- volatile node m_tail;
- public mpmcq(node n) {
- n.m_next = null;
- m_head = m_tail = n;
- }
- public mpmcq() : this(new node()) {}
- public void push(T state) {
- node n = new node(state);
- n.m_next = null;
- node prev = System.Threading.Interlocked.Exchange(ref m_head, n);
- prev.m_next = n;
- }
- public bool pop(out T state) {
- node cmp, cmptmp = m_tail;
- do {
- cmp = cmptmp;
- node next = cmp.m_next;
- if (next == null) {
- state = default(T);
- return false;
- }
- state = next.m_state;
- cmptmp = System.Threading.Interlocked.CompareExchange(ref m_tail, next, cmp);
- } while (cmp != cmptmp);
- return true;
- }
- };
- class producer {
- ecount m_ecount;
- mpmcq<string> m_queue;
- int m_max;
- int m_id;
- public producer(int id, ecount ecount, mpmcq<string> queue, int max) {
- m_ecount = ecount;
- m_queue = queue;
- m_max = max;
- m_id = id;
- }
- public void run() {
- System.Console.WriteLine("Producer " + m_id + ", Running!");
- for (int i = 0; i < m_max; ++i) {
- m_queue.push("Producer " + m_id + ", Object " + i);
- m_ecount.signal();
- System.Threading.Thread.Sleep(1);
- }
- System.Console.WriteLine("Producer " + m_id + ", Finished!");
- }
- };
- class consumer {
- ecount m_ecount;
- mpmcq<string> m_queue;
- int m_id;
- public static long g_mcount = 0;
- public static long g_ccount = 0;
- public consumer(int id, ecount ecount, mpmcq<string> queue) {
- m_ecount = ecount;
- m_queue = queue;
- m_id = id;
- }
- public void run() {
- System.Console.WriteLine("Consumer " + m_id + ", Running!");
- string s;
- do {
- while (! m_queue.pop(out s)) {
- long key = m_ecount.get();
- if (m_queue.pop(out s)) break;
- m_ecount.wait(key);
- }
- System.Console.WriteLine("Consumer " + m_id + ", Received Message: " + s);
- } while (System.Threading.Interlocked.Decrement(ref g_mcount) > 0);
- for (int i = 1; i < g_ccount; ++i) {
- m_queue.push("Consumer " + m_id + ", SHUTDOWN " + i);
- }
- m_ecount.broadcast();
- System.Console.WriteLine("Consumer " + m_id + ", Finished!");
- }
- };
- class application {
- ecount m_ecount = new ecount();
- mpmcq<string> m_queue = new mpmcq<string>();
- int m_pcount, m_ccount, m_mcount;
- producer[] m_producers;
- consumer[] m_consumers;
- System.Threading.Thread[] m_threads;
- void prv_get_params() {
- get_producer_count:
- try {
- System.Console.Write("Number of producers: ");
- m_pcount = System.Int32.Parse(System.Console.ReadLine());
- if (m_pcount < 1) {
- System.Console.WriteLine("Sorry, you need at least one producer!");
- System.Media.SystemSounds.Exclamation.Play();
- goto get_producer_count;
- }
- get_consumer_count:
- System.Console.Write("Number of consumers: ");
- m_ccount = System.Int32.Parse(System.Console.ReadLine());
- if (m_ccount < 1) {
- System.Console.WriteLine("Sorry, you need at least one consumer!");
- System.Media.SystemSounds.Exclamation.Play();
- goto get_consumer_count;
- }
- get_message_count:
- System.Console.Write("Number of messages per-producer: ");
- m_mcount = System.Int32.Parse(System.Console.ReadLine());
- if (m_mcount < 1) {
- System.Console.WriteLine("Sorry, you need at least one message per-producer!");
- System.Media.SystemSounds.Exclamation.Play();
- goto get_message_count;
- }
- } catch {
- System.Console.WriteLine("Bad input; try again!");
- System.Media.SystemSounds.Exclamation.Play();
- goto get_producer_count;
- }
- consumer.g_mcount = m_pcount * m_mcount;
- consumer.g_ccount = m_ccount;
- System.Console.Clear();
- }
- void prv_create_threads() {
- m_producers = new producer[m_pcount];
- m_consumers = new consumer[m_ccount];
- m_threads = new System.Threading.Thread[m_pcount + m_ccount];
- for (int i = 0; i < m_pcount; ++i) {
- m_producers[i] = new producer(i, m_ecount, m_queue, m_mcount);
- }
- for (int i = 0; i < m_ccount; ++i) {
- m_consumers[i] = new consumer(i, m_ecount, m_queue);
- }
- for (int i = 0; i < m_ccount; ++i) {
- m_threads[i] = new System.Threading.Thread(m_consumers[i].run);
- }
- for (int i = 0; i < m_pcount; ++i) {
- m_threads[i + m_ccount] = new System.Threading.Thread(m_producers[i].run);
- }
- }
- void prv_start_threads() {
- for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Start();
- }
- void prv_join_threads() {
- for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Join();
- }
- public application() {
- prv_get_params();
- prv_create_threads();
- }
- public void go() {
- prv_start_threads();
- prv_join_threads();
- }
- static void Main(string[] args) {
- application app = new application();
- app.go();
- System.Console.WriteLine("\n\nPress <ENTER> to exit...");
- System.Console.ReadLine();
- }
- };
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement