Advertisement
Guest User

Chris M Thomasson

a guest
Jun 24th, 2009
837
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 7.75 KB | None | 0 0
  1. namespace syncronization {
  2.   class ecount {
  3.     private long m_count = 0;
  4.     private long m_waiters = 0;
  5.  
  6.  
  7.     private static long InterlockedOr(ref long dest, long value) {
  8.       long cmp, cmptmp = dest;
  9.       do {
  10.         cmp = cmptmp;
  11.         cmptmp = System.Threading.Interlocked.CompareExchange(ref dest, cmp | value, cmp);
  12.       } while (cmp != cmptmp);
  13.       return cmp;
  14.     }
  15.  
  16.  
  17.     private void prv_signal(long cmptmp, bool broadcast) {
  18.       if ((cmptmp & 1) != 0) {
  19.         lock (this) {
  20.           long cmp;
  21.           do {
  22.             cmp = cmptmp;
  23.             long newval = cmp + 2;
  24.             if (m_waiters < 2 || broadcast) {
  25.               newval &= ~1;
  26.             }
  27.             cmptmp = System.Threading.Interlocked.CompareExchange(ref m_count, newval, cmp);
  28.           } while (cmp != cmptmp);
  29.           if (m_waiters > 0) {
  30.             if (m_waiters == 1 || ! broadcast) {
  31.               --m_waiters;
  32.               System.Threading.Monitor.Pulse(this);
  33.             } else {
  34.               m_waiters = 0;
  35.               System.Threading.Monitor.PulseAll(this);
  36.             }
  37.           }
  38.         }
  39.       }
  40.     }
  41.  
  42.  
  43.     public long get() {
  44.       return InterlockedOr(ref m_count, 1);
  45.     }
  46.  
  47.  
  48.     public void signal() {
  49.       long cmp = System.Threading.Thread.VolatileRead(ref m_count);
  50.       System.Threading.Thread.MemoryBarrier();
  51.       prv_signal(cmp, false);
  52.     }
  53.  
  54.  
  55.     public void signal_relaxed() {
  56.       long cmp = System.Threading.Thread.VolatileRead(ref m_count);
  57.       prv_signal(cmp, false);
  58.     }
  59.  
  60.  
  61.     public void broadcast() {
  62.       long cmp = System.Threading.Thread.VolatileRead(ref m_count);
  63.       System.Threading.Thread.MemoryBarrier();
  64.       prv_signal(cmp, true);
  65.     }
  66.  
  67.  
  68.     public void broadcast_relaxed() {
  69.       long cmp = System.Threading.Thread.VolatileRead(ref m_count);
  70.       prv_signal(cmp, true);
  71.     }
  72.  
  73.  
  74.     public void wait(long cmp1) {
  75.       lock (this) {
  76.         long cmp2 = System.Threading.Thread.VolatileRead(ref m_count) & ~1;
  77.         if ((cmp1 & ~1) == cmp2) {
  78.           ++m_waiters;
  79.           System.Threading.Monitor.Wait(this);
  80.         }
  81.       }
  82.     }
  83.   };
  84.  
  85.  
  86.   class mpmcq<T> {
  87.     public class node {
  88.       public volatile node m_next = null;
  89.       public T m_state = default(T);
  90.  
  91.       public node() {}
  92.  
  93.       public node(T state) {
  94.         m_state = state;
  95.       }
  96.     };
  97.  
  98.  
  99.     volatile node m_head;
  100.     volatile node m_tail;
  101.  
  102.  
  103.     public mpmcq(node n) {
  104.       n.m_next = null;
  105.       m_head = m_tail = n;
  106.     }
  107.  
  108.  
  109.     public mpmcq() : this(new node()) {}
  110.  
  111.  
  112.     public void push(T state) {
  113.       node n = new node(state);
  114.       n.m_next = null;
  115.       node prev = System.Threading.Interlocked.Exchange(ref m_head, n);
  116.       prev.m_next = n;
  117.     }
  118.  
  119.  
  120.     public bool pop(out T state) {
  121.       node cmp, cmptmp = m_tail;
  122.       do {
  123.         cmp = cmptmp;
  124.         node next = cmp.m_next;
  125.         if (next == null) {
  126.           state = default(T);
  127.           return false;
  128.         }
  129.         state = next.m_state;
  130.         cmptmp = System.Threading.Interlocked.CompareExchange(ref m_tail, next, cmp);
  131.       } while (cmp != cmptmp);
  132.       return true;
  133.     }
  134.   };
  135.  
  136.  
  137.   class producer {
  138.     ecount m_ecount;
  139.     mpmcq<string> m_queue;
  140.     int m_max;
  141.     int m_id;
  142.  
  143.  
  144.     public producer(int id, ecount ecount, mpmcq<string> queue, int max) {
  145.       m_ecount = ecount;
  146.       m_queue = queue;
  147.       m_max = max;
  148.       m_id = id;
  149.     }
  150.  
  151.  
  152.     public void run() {
  153.       System.Console.WriteLine("Producer " + m_id + ", Running!");
  154.       for (int i = 0; i < m_max; ++i) {
  155.         m_queue.push("Producer " + m_id + ", Object " + i);
  156.         m_ecount.signal();
  157.         System.Threading.Thread.Sleep(1);
  158.       }
  159.       System.Console.WriteLine("Producer " + m_id + ", Finished!");
  160.     }
  161.   };
  162.  
  163.  
  164.   class consumer {
  165.     ecount m_ecount;
  166.     mpmcq<string> m_queue;
  167.     int m_id;
  168.  
  169.  
  170.     public static long g_mcount = 0;
  171.     public static long g_ccount = 0;
  172.  
  173.  
  174.     public consumer(int id, ecount ecount, mpmcq<string> queue) {
  175.       m_ecount = ecount;
  176.       m_queue = queue;
  177.       m_id = id;
  178.     }
  179.  
  180.  
  181.     public void run() {
  182.       System.Console.WriteLine("Consumer " + m_id + ", Running!");
  183.       string s;
  184.       do {
  185.         while (! m_queue.pop(out s)) {
  186.           long key = m_ecount.get();
  187.           if (m_queue.pop(out s)) break;
  188.           m_ecount.wait(key);
  189.         }
  190.         System.Console.WriteLine("Consumer " + m_id + ", Received Message: " + s);
  191.       } while (System.Threading.Interlocked.Decrement(ref g_mcount) > 0);
  192.  
  193.       for (int i = 1; i < g_ccount; ++i) {
  194.         m_queue.push("Consumer " + m_id + ", SHUTDOWN " + i);
  195.       }
  196.       m_ecount.broadcast();
  197.  
  198.       System.Console.WriteLine("Consumer " + m_id + ", Finished!");
  199.     }
  200.   };
  201.  
  202.  
  203.   class application {
  204.     ecount m_ecount = new ecount();
  205.     mpmcq<string> m_queue = new  mpmcq<string>();
  206.     int m_pcount, m_ccount, m_mcount;
  207.     producer[] m_producers;
  208.     consumer[] m_consumers;
  209.     System.Threading.Thread[] m_threads;
  210.    
  211.  
  212.     void prv_get_params() {
  213.      get_producer_count:
  214.       try {
  215.         System.Console.Write("Number of producers: ");
  216.         m_pcount = System.Int32.Parse(System.Console.ReadLine());
  217.         if (m_pcount < 1) {
  218.           System.Console.WriteLine("Sorry, you need at least one producer!");
  219.           System.Media.SystemSounds.Exclamation.Play();
  220.           goto get_producer_count;
  221.         }
  222.  
  223.      get_consumer_count:
  224.         System.Console.Write("Number of consumers: ");
  225.         m_ccount = System.Int32.Parse(System.Console.ReadLine());
  226.         if (m_ccount < 1) {
  227.           System.Console.WriteLine("Sorry, you need at least one consumer!");
  228.           System.Media.SystemSounds.Exclamation.Play();
  229.           goto get_consumer_count;
  230.         }
  231.  
  232.      get_message_count:
  233.         System.Console.Write("Number of messages per-producer: ");
  234.         m_mcount = System.Int32.Parse(System.Console.ReadLine());
  235.         if (m_mcount < 1) {
  236.           System.Console.WriteLine("Sorry, you need at least one message per-producer!");
  237.           System.Media.SystemSounds.Exclamation.Play();
  238.           goto get_message_count;
  239.         }
  240.       } catch {
  241.         System.Console.WriteLine("Bad input; try again!");
  242.         System.Media.SystemSounds.Exclamation.Play();
  243.         goto get_producer_count;
  244.       }
  245.       consumer.g_mcount = m_pcount * m_mcount;
  246.       consumer.g_ccount = m_ccount;
  247.       System.Console.Clear();
  248.     }
  249.  
  250.  
  251.     void prv_create_threads() {
  252.       m_producers = new producer[m_pcount];
  253.       m_consumers = new consumer[m_ccount];
  254.       m_threads = new System.Threading.Thread[m_pcount + m_ccount];
  255.  
  256.       for (int i = 0; i < m_pcount; ++i) {
  257.         m_producers[i] = new producer(i, m_ecount, m_queue, m_mcount);
  258.       }
  259.  
  260.       for (int i = 0; i < m_ccount; ++i) {
  261.         m_consumers[i] = new consumer(i, m_ecount, m_queue);
  262.       }
  263.  
  264.       for (int i = 0; i < m_ccount; ++i) {
  265.         m_threads[i] = new System.Threading.Thread(m_consumers[i].run);
  266.       }
  267.  
  268.       for (int i = 0; i < m_pcount; ++i) {
  269.         m_threads[i + m_ccount] = new System.Threading.Thread(m_producers[i].run);
  270.       }
  271.     }
  272.  
  273.  
  274.     void prv_start_threads() {
  275.       for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Start();
  276.     }
  277.  
  278.  
  279.     void prv_join_threads() {
  280.       for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Join();
  281.     }
  282.  
  283.  
  284.     public application() {
  285.       prv_get_params();
  286.       prv_create_threads();
  287.     }
  288.  
  289.  
  290.     public void go() {
  291.       prv_start_threads();
  292.       prv_join_threads();
  293.     }
  294.  
  295.  
  296.     static void Main(string[] args) {
  297.       application app = new application();
  298.       app.go();
  299.       System.Console.WriteLine("\n\nPress <ENTER> to exit...");
  300.       System.Console.ReadLine();
  301.     }
  302.   };
  303. }
  304.  
  305.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement