Advertisement
Guest User

Chris M Thomasson

a guest
Jun 25th, 2009
760
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.79 KB | None | 0 0
  1. #define RL_GC
  2. #include <relacy/relacy_std.hpp>
  3. #include <cstdio>
  4.  
  5.  
  6.  
  7. class eventcount {
  8. public:
  9.   typedef unsigned long key_type;
  10.  
  11.  
  12. private:
  13.   mutable rl::atomic<key_type> m_count;
  14.   rl::mutex m_mutex;
  15.   rl::condition_variable m_cond;
  16.  
  17.  
  18.   void prv_signal(key_type key) {
  19.     if (key & 1) {
  20.       m_mutex.lock($);
  21.       while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
  22.         rl::memory_order_seq_cst));
  23.       m_mutex.unlock($);
  24.       m_cond.notify_all($);
  25.     }
  26.   }
  27.  
  28.  
  29. public:
  30.   eventcount() {
  31.     m_count($).store(0, rl::memory_order_relaxed);
  32.   }
  33.  
  34.  
  35. public:
  36.   key_type get() const {
  37.     return m_count($).fetch_or(1, rl::memory_order_acquire);
  38.   }
  39.  
  40.  
  41.   void signal() {
  42.     prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
  43.   }
  44.  
  45.  
  46.   void signal_relaxed() {
  47.     prv_signal(m_count($).load(rl::memory_order_relaxed));
  48.   }
  49.  
  50.  
  51.   void wait(key_type cmp) {
  52.     m_mutex.lock($);
  53.     if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
  54.       m_cond.wait(m_mutex, $);
  55.     }
  56.     m_mutex.unlock($);
  57.   }
  58. };
  59.  
  60.  
  61.  
  62.  
  63. template<typename T>
  64. class mpmcq {
  65.   struct node {
  66.     rl::atomic<node*> m_next;
  67.     T volatile m_state;
  68.   };
  69.  
  70.  
  71.   rl::atomic<node*> m_head;
  72.   rl::atomic<node*> m_tail;
  73.  
  74.  
  75. public:
  76.   mpmcq() {
  77.     node* n = RL_NEW(node);
  78.     n->m_next($).store(NULL, rl::memory_order_relaxed);
  79.     m_head($).store(n, rl::memory_order_relaxed);
  80.     m_tail($).store(n, rl::memory_order_relaxed);
  81.   }
  82.  
  83.  
  84.   ~mpmcq() {
  85.     RL_ASSERT(m_head($).load(rl::memory_order_relaxed) ==
  86.               m_tail($).load(rl::memory_order_relaxed));
  87.   }
  88.  
  89.  
  90. public:
  91.   void push(T& state) {
  92.     node* n = RL_NEW(node);
  93.     n->m_next($).store(NULL, rl::memory_order_relaxed);
  94.     n->m_state = state;
  95.     node* p = m_head($).exchange(n, rl::memory_order_seq_cst);
  96.     p->m_next($).store(n, rl::memory_order_seq_cst);
  97.   }
  98.  
  99.  
  100.   bool pop(T& state) {
  101.     node* n;
  102.     node* t = m_tail($).load(rl::memory_order_seq_cst);
  103.     do {
  104.       n = t->m_next($).load(rl::memory_order_seq_cst);
  105.       if (! n) return false;
  106.       state = n->m_state;
  107.     } while (! m_tail($).compare_exchange_weak(t, n, rl::memory_order_seq_cst));
  108.     return true;
  109.   }
  110. };
  111.  
  112.  
  113.  
  114.  
  115.  
  116. #define PRODUCERS 2
  117. #define CONSUMERS 2
  118. #define THREADS (PRODUCERS + CONSUMERS)
  119. #define ITERS 6
  120.  
  121.  
  122. struct mpmcq_test : rl::test_suite<mpmcq_test, THREADS> {
  123.   eventcount m_ecount;
  124.   mpmcq<int> m_queue;
  125.   std::atomic<int> m_count;
  126.  
  127.  
  128.   void before() {
  129.     m_count($).store(PRODUCERS * ITERS, rl::memory_order_relaxed);
  130.   }
  131.  
  132.  
  133.   void invariant() {
  134.     RL_ASSERT(m_count($).load(rl::memory_order_relaxed) > -1);
  135.   }
  136.  
  137.  
  138.   void after() {
  139.     RL_ASSERT(! m_count($).load(rl::memory_order_relaxed));
  140.   }
  141.  
  142.  
  143.   void thread(unsigned tidx) {
  144.     int i;
  145.     if (tidx < PRODUCERS) {
  146.       for (i = 0; i < ITERS; ++i) {
  147.         m_queue.push(i);
  148.         m_ecount.signal();
  149.       }
  150.     } else {
  151.       do {
  152.         while (! m_queue.pop(i)) {
  153.           eventcount::key_type key = m_ecount.get();
  154.           if (m_queue.pop(i)) break;
  155.           m_ecount.wait(key);
  156.         }
  157.       } while (i != -666 &&
  158.                m_count($).fetch_add(-1, rl::memory_order_relaxed) != 1);
  159.       if (i != -666) {
  160.         for (i = 1; i < CONSUMERS; ++i) {
  161.           int x = -666;
  162.           m_queue.push(x);
  163.         }
  164.         m_ecount.signal();
  165.       }
  166.     }
  167.   }
  168. };
  169.  
  170.  
  171.  
  172.  
  173. int main() {
  174.   rl::test_params params;
  175.   params.iteration_count = 99999999;
  176.   //params.search_type = rl::fair_full_search_scheduler_type;
  177.   //params.search_type = rl::fair_context_bound_scheduler_type;
  178.   rl::simulate<mpmcq_test>(params);
  179.   std::puts("\n\n\n____________________________________\n"
  180.             "DONE!!!!\npress <ENTER> to exit...");
  181.   std::fflush(stdin);
  182.   std::fflush(stdout);
  183.   std::getchar();
  184.   return 0;
  185. }
  186.  
  187.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement