namespace syncronization {
class ecount {
private long m_count = 0;
private long m_waiters = 0;
private static long InterlockedOr(ref long dest, long value) {
long cmp, cmptmp = dest;
do {
cmp = cmptmp;
cmptmp = System.Threading.Interlocked.CompareExchange(ref dest, cmp | value, cmp);
} while (cmp != cmptmp);
return cmp;
}
private void prv_signal(long cmptmp, bool broadcast) {
if ((cmptmp & 1) != 0) {
lock (this) {
long cmp;
do {
cmp = cmptmp;
long newval = cmp + 2;
if (m_waiters < 2 || broadcast) {
newval &= ~1;
}
cmptmp = System.Threading.Interlocked.CompareExchange(ref m_count, newval, cmp);
} while (cmp != cmptmp);
if (m_waiters > 0) {
if (m_waiters == 1 || ! broadcast) {
--m_waiters;
System.Threading.Monitor.Pulse(this);
} else {
m_waiters = 0;
System.Threading.Monitor.PulseAll(this);
}
}
}
}
}
public long get() {
return InterlockedOr(ref m_count, 1);
}
public void signal() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
System.Threading.Thread.MemoryBarrier();
prv_signal(cmp, false);
}
public void signal_relaxed() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
prv_signal(cmp, false);
}
public void broadcast() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
System.Threading.Thread.MemoryBarrier();
prv_signal(cmp, true);
}
public void broadcast_relaxed() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
prv_signal(cmp, true);
}
public void wait(long cmp1) {
lock (this) {
long cmp2 = System.Threading.Thread.VolatileRead(ref m_count) & ~1;
if ((cmp1 & ~1) == cmp2) {
++m_waiters;
System.Threading.Monitor.Wait(this);
}
}
}
};
class mpmcq<T> {
public class node {
public volatile node m_next = null;
public T m_state = default(T);
public node() {}
public node(T state) {
m_state = state;
}
};
volatile node m_head;
volatile node m_tail;
public mpmcq(node n) {
n.m_next = null;
m_head = m_tail = n;
}
public mpmcq() : this(new node()) {}
public void push(T state) {
node n = new node(state);
n.m_next = null;
node prev = System.Threading.Interlocked.Exchange(ref m_head, n);
prev.m_next = n;
}
public bool pop(out T state) {
node cmp, cmptmp = m_tail;
do {
cmp = cmptmp;
node next = cmp.m_next;
if (next == null) {
state = default(T);
return false;
}
state = next.m_state;
cmptmp = System.Threading.Interlocked.CompareExchange(ref m_tail, next, cmp);
} while (cmp != cmptmp);
return true;
}
};
class producer {
ecount m_ecount;
mpmcq<string> m_queue;
int m_max;
int m_id;
public producer(int id, ecount ecount, mpmcq<string> queue, int max) {
m_ecount = ecount;
m_queue = queue;
m_max = max;
m_id = id;
}
public void run() {
System.Console.WriteLine("Producer " + m_id + ", Running!");
for (int i = 0; i < m_max; ++i) {
m_queue.push("Producer " + m_id + ", Object " + i);
m_ecount.signal();
System.Threading.Thread.Sleep(1);
}
System.Console.WriteLine("Producer " + m_id + ", Finished!");
}
};
class consumer {
ecount m_ecount;
mpmcq<string> m_queue;
int m_id;
public static long g_mcount = 0;
public static long g_ccount = 0;
public consumer(int id, ecount ecount, mpmcq<string> queue) {
m_ecount = ecount;
m_queue = queue;
m_id = id;
}
public void run() {
System.Console.WriteLine("Consumer " + m_id + ", Running!");
string s;
do {
while (! m_queue.pop(out s)) {
long key = m_ecount.get();
if (m_queue.pop(out s)) break;
m_ecount.wait(key);
}
System.Console.WriteLine("Consumer " + m_id + ", Received Message: " + s);
} while (System.Threading.Interlocked.Decrement(ref g_mcount) > 0);
for (int i = 1; i < g_ccount; ++i) {
m_queue.push("Consumer " + m_id + ", SHUTDOWN " + i);
}
m_ecount.broadcast();
System.Console.WriteLine("Consumer " + m_id + ", Finished!");
}
};
class application {
ecount m_ecount = new ecount();
mpmcq<string> m_queue = new mpmcq<string>();
int m_pcount, m_ccount, m_mcount;
producer[] m_producers;
consumer[] m_consumers;
System.Threading.Thread[] m_threads;
void prv_get_params() {
get_producer_count:
try {
System.Console.Write("Number of producers: ");
m_pcount = System.Int32.Parse(System.Console.ReadLine());
if (m_pcount < 1) {
System.Console.WriteLine("Sorry, you need at least one producer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_producer_count;
}
get_consumer_count:
System.Console.Write("Number of consumers: ");
m_ccount = System.Int32.Parse(System.Console.ReadLine());
if (m_ccount < 1) {
System.Console.WriteLine("Sorry, you need at least one consumer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_consumer_count;
}
get_message_count:
System.Console.Write("Number of messages per-producer: ");
m_mcount = System.Int32.Parse(System.Console.ReadLine());
if (m_mcount < 1) {
System.Console.WriteLine("Sorry, you need at least one message per-producer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_message_count;
}
} catch {
System.Console.WriteLine("Bad input; try again!");
System.Media.SystemSounds.Exclamation.Play();
goto get_producer_count;
}
consumer.g_mcount = m_pcount * m_mcount;
consumer.g_ccount = m_ccount;
System.Console.Clear();
}
void prv_create_threads() {
m_producers = new producer[m_pcount];
m_consumers = new consumer[m_ccount];
m_threads = new System.Threading.Thread[m_pcount + m_ccount];
for (int i = 0; i < m_pcount; ++i) {
m_producers[i] = new producer(i, m_ecount, m_queue, m_mcount);
}
for (int i = 0; i < m_ccount; ++i) {
m_consumers[i] = new consumer(i, m_ecount, m_queue);
}
for (int i = 0; i < m_ccount; ++i) {
m_threads[i] = new System.Threading.Thread(m_consumers[i].run);
}
for (int i = 0; i < m_pcount; ++i) {
m_threads[i + m_ccount] = new System.Threading.Thread(m_producers[i].run);
}
}
void prv_start_threads() {
for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Start();
}
void prv_join_threads() {
for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Join();
}
public application() {
prv_get_params();
prv_create_threads();
}
public void go() {
prv_start_threads();
prv_join_threads();
}
static void Main(string[] args) {
application app = new application();
app.go();
System.Console.WriteLine("\n\nPress <ENTER> to exit...");
System.Console.ReadLine();
}
};
}