SHARE
TWEET

Chris M Thomasson

a guest Sep 19th, 2009 162 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <relacy/relacy_std.hpp>
  2. #include <cstdio>
  3.  
  4.  
  5.  
  6.  
  7. #if ! defined (NDEBUG)
  8. #  define DBG_PRINTF(mp_exp) std::printf mp_exp
  9. #else
  10. #  define DBG_PRINTF(mp_exp) ((void)0)
  11. #endif
  12.  
  13.  
  14.  
  15.  
  16. class semaphore
  17. {
  18.     rl::atomic<long> m_count;
  19.     rl::sem_t m_waitset;
  20.  
  21.  
  22. public:
  23.     semaphore(long count = 0)
  24.     :   m_count(count)
  25.     {
  26.         RL_ASSERT(count > -1);
  27.         rl::sem_init(&m_waitset, 0, 0, $);
  28.     }
  29.  
  30.  
  31.     ~semaphore()
  32.     {
  33.         rl::sem_destroy(&m_waitset, $);
  34.     }
  35.  
  36.  
  37. public:
  38.     void post()
  39.     {
  40.         if (m_count($).fetch_add(1) < 0)
  41.         {
  42.             rl::sem_post(&m_waitset, $);
  43.         }
  44.     }
  45.  
  46.  
  47.     void wait()
  48.     {
  49.         if (m_count($).fetch_add(-1) < 1)
  50.         {
  51.             while (rl::sem_wait(&m_waitset, $));
  52.         }
  53.     }
  54. };
  55.  
  56.  
  57.  
  58.  
  59. template<typename T, std::size_t T_depth>
  60. class mpmcq
  61. {
  62.     rl::atomic<T*> m_slots[T_depth];
  63.     rl::atomic<std::size_t> m_push_idx;
  64.     rl::atomic<std::size_t> m_pop_idx;
  65.     semaphore m_push_sem;
  66.     semaphore m_pop_sem;
  67.  
  68.  
  69. public:
  70.     mpmcq()
  71.     :   m_push_idx(T_depth),
  72.         m_pop_idx(0),
  73.         m_push_sem(T_depth)
  74.     {
  75.         for (std::size_t i = 0; i < T_depth; ++i)
  76.         {
  77.             m_slots[i]($).store(NULL);
  78.         }
  79.     }
  80.  
  81.  
  82. public:
  83.     void push(T* ptr)
  84.     {
  85.         m_push_sem.wait();
  86.  
  87.         std::size_t idx = m_push_idx($).fetch_add(1) & (T_depth - 1);
  88.  
  89.         T* cmp = NULL;
  90.         rl::backoff backoff;
  91.  
  92.         while (m_slots[idx]($).load())
  93.         {
  94.             backoff.yield($);
  95.         }
  96.  
  97.         RL_ASSERT(! m_slots[idx]($).load());
  98.  
  99.         m_slots[idx]($).store(ptr);
  100.  
  101.         m_pop_sem.post();
  102.     }
  103.  
  104.  
  105.     T* pop()
  106.     {
  107.         m_pop_sem.wait();
  108.  
  109.         std::size_t idx = m_pop_idx($).fetch_add(1) & (T_depth - 1);
  110.  
  111.         T* ptr;
  112.         rl::backoff backoff;
  113.  
  114.         while (! (ptr = m_slots[idx]($).load()))
  115.         {
  116.             backoff.yield($);
  117.         }
  118.  
  119.         m_slots[idx]($).store(NULL);
  120.  
  121.         m_push_sem.post();
  122.  
  123.         return ptr;
  124.     }
  125. };
  126.  
  127.  
  128.  
  129.  
  130. #define PRODUCERS 5
  131. #define CONSUMERS 5
  132. #define THREADS (PRODUCERS + CONSUMERS)
  133. #define ITERS 13
  134. #define DEPTH 8
  135.  
  136.  
  137. struct mpmcq_test : rl::test_suite<mpmcq_test, THREADS>
  138. {
  139.     struct node
  140.     {
  141.         unsigned m_tidx;
  142.     };
  143.  
  144.     mpmcq<node, DEPTH> m_queue;
  145.    
  146.     void thread(unsigned tidx)
  147.     {
  148.         node* n;
  149.         unsigned i;
  150.  
  151.         if (tidx < PRODUCERS)
  152.         {
  153.             for (i = 0; i < ITERS; ++i)
  154.             {
  155.                 n = RL_NEW node;
  156.  
  157.                 n->m_tidx = tidx;
  158.  
  159.                 m_queue.push(n);
  160.  
  161.                 DBG_PRINTF(("thread %u pushed %p\n",
  162.                     tidx, (void*)n));
  163.             }
  164.         }
  165.  
  166.         else
  167.         {
  168.             for (i = 0; i < ITERS; ++i)
  169.             {
  170.                 n = m_queue.pop();
  171.  
  172.                 unsigned tidx_ = n->m_tidx;
  173.  
  174.                 RL_ASSERT(tidx != tidx_);
  175.  
  176.                 DBG_PRINTF(("thread %u popped %p from thread %u\n",
  177.                     tidx, (void*)n, tidx_));
  178.  
  179.                 RL_DELETE(n);
  180.             }            
  181.         }
  182.     }
  183. };
  184.  
  185.  
  186.  
  187.  
  188. int main()
  189. {  
  190.     rl::test_params params;
  191.     params.iteration_count = 99999999;
  192.     //params.search_type = rl::fair_full_search_scheduler_type;
  193.     //params.search_type = rl::fair_context_bound_scheduler_type;
  194.     rl::simulate<mpmcq_test>(params);
  195.  
  196.     return 0;
  197. }
RAW Paste Data
Top