SHARE
TWEET

Chris M Thomasson

a guest Apr 9th, 2009 583 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <relacy/relacy_std.hpp>
  2. #include <cstdio>
  3. #include <cstddef>
  4.  
  5.  
  6.  
  7.  
  8.  
  9. #if ! defined (NDEBUG)
  10. #   define DBG_PRINTF(mp_exp) std::printf mp_exp
  11. #else
  12. #   define DBG_PRINTF(mp_exp) ((void)0)
  13. #endif
  14.  
  15.  
  16.  
  17.  
  18. class eventcount {
  19. public:
  20.   typedef unsigned long key_type;
  21.  
  22.  
  23. private:
  24.   mutable rl::atomic<key_type> m_count;
  25.   rl::mutex m_mutex;
  26.   rl::condition_variable m_cond;
  27.  
  28.  
  29.   void prv_signal(key_type key) {
  30.     if (key & 1) {
  31.       m_mutex.lock($);
  32.       while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
  33.         rl::memory_order_seq_cst));
  34.       m_mutex.unlock($);
  35.       m_cond.notify_all($);
  36.     }
  37.   }
  38.  
  39.  
  40. public:
  41.   eventcount() {
  42.     m_count($).store(0, rl::memory_order_relaxed);
  43.   }
  44.  
  45.  
  46. public:
  47.   key_type get() const {
  48.     return m_count($).fetch_or(1, rl::memory_order_acquire);
  49.   }
  50.  
  51.  
  52.   void signal() {
  53.     prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
  54.   }
  55.  
  56.  
  57.   void signal_relaxed() {
  58.     prv_signal(m_count($).load(rl::memory_order_relaxed));
  59.   }
  60.  
  61.  
  62.   void wait(key_type cmp) {
  63.     m_mutex.lock($);
  64.     if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
  65.       m_cond.wait(m_mutex, $);
  66.     }
  67.     m_mutex.unlock($);
  68.   }
  69. };
  70.  
  71.  
  72.  
  73.  
  74. template<typename T, std::size_t T_size>
  75. class mpsc_boundq {
  76.   rl::atomic<std::size_t> m_push;
  77.   rl::atomic<std::size_t> m_push_idx;
  78.   rl::atomic<bool> m_commit[T_size];
  79.   rl::var<std::size_t> m_pop_idx;
  80.   rl::var<T> m_buffer[T_size];
  81.   eventcount m_pop_ecount;
  82.   eventcount m_push_ecount;
  83.  
  84.  
  85.  
  86. private:
  87.   bool prv_try_push_strong() {
  88.     std::size_t push_ = m_push($).load(rl::memory_order_relaxed);
  89.  
  90.     while (! m_push($).compare_exchange_weak(push_,
  91.       (push_) ? push_ - 1 : 0, rl::memory_order_seq_cst));
  92.  
  93.     return (push_) ? true : false;
  94.   }
  95.  
  96.  
  97.  
  98. public:
  99.   mpsc_boundq() {
  100.     m_push($).store(T_size, rl::memory_order_relaxed);
  101.     m_push_idx($).store(T_size, rl::memory_order_relaxed);
  102.     m_pop_idx($) = 0;
  103.  
  104.     for (std::size_t i = 0; i < T_size; ++i) {
  105.       m_buffer[i]($) = T();
  106.       m_commit[i]($).store(false, rl::memory_order_relaxed);
  107.     }
  108.   }
  109.  
  110.  
  111.   ~mpsc_boundq() {
  112.     RL_ASSERT(m_push($).load(rl::memory_order_relaxed) == T_size);
  113.  
  114.     for (std::size_t i = 0; i < T_size; ++i) {
  115.       RL_ASSERT(! m_commit[i]($).load(rl::memory_order_relaxed));
  116.     }
  117.   }
  118.  
  119.  
  120.  
  121. public:
  122.   void push(T const& state) {
  123.     while (! prv_try_push_strong()) {
  124.       eventcount::key_type const eckey = m_push_ecount.get();
  125.       if (prv_try_push_strong()) break;
  126.       m_push_ecount.wait(eckey);
  127.     }
  128.  
  129.     std::size_t const i = m_push_idx($).fetch_add(1,
  130.       rl::memory_order_seq_cst) % T_size;
  131.  
  132.     m_buffer[i]($) = state;
  133.     m_commit[i]($).store(true, rl::memory_order_seq_cst);
  134.  
  135.  
  136.  
  137. /* if following is: `m_pop_ecount.signal_relaxed();' == BOOM!!!!
  138.      ;^D RELACY!!!
  139. ______________________________________________________________________*/
  140.     m_pop_ecount.signal();
  141.  
  142.     // m_pop_ecount.signal_relaxed();
  143.  
  144.   }
  145.  
  146.  
  147.   void pop(T& state) {
  148.     std::size_t const i = m_pop_idx($) % T_size;
  149.  
  150.     ++m_pop_idx($);
  151.  
  152.     while (! m_commit[i]($).load(rl::memory_order_seq_cst)) {
  153.       eventcount::key_type const eckey = m_pop_ecount.get();
  154.       if (m_commit[i]($).load(rl::memory_order_seq_cst)) break;
  155.       m_pop_ecount.wait(eckey);
  156.     }
  157.  
  158.     state = m_buffer[i]($);
  159.     m_commit[i]($).store(false, rl::memory_order_seq_cst);
  160.     m_push($).fetch_add(1, rl::memory_order_seq_cst);
  161.     m_push_ecount.signal_relaxed();
  162.   }
  163. };
  164.  
  165.  
  166.  
  167.  
  168.  
  169.  
  170.  
  171.  
  172. #define CONSUMERS 1
  173. #define PRODUCERS 4
  174. #define THREADS (CONSUMERS + PRODUCERS)
  175. #define ITERS 6
  176. #define BUFFER (PRODUCERS * ITERS)
  177.  
  178.  
  179.  
  180.  
  181. struct buffer_test : rl::test_suite<buffer_test, THREADS> {
  182.   mpsc_boundq<unsigned, BUFFER / 2> m_buffer;
  183.  
  184.  
  185.   void thread(unsigned tidx) {
  186.     if (tidx < CONSUMERS) {
  187.       for (unsigned i = 0; i < BUFFER; ++i) {
  188.         m_buffer.pop(tidx);
  189.         DBG_PRINTF(("consumer loop: %u\n", tidx));
  190.       }
  191.  
  192.     } else {
  193.       for (unsigned i = 0; i < ITERS * CONSUMERS; ++i) {
  194.         m_buffer.push(i);
  195.         DBG_PRINTF(("producer(%u) loop: %u\n", tidx, i));
  196.       }
  197.     }
  198.   }
  199. };
  200.  
  201.  
  202.  
  203.  
  204. int main() {
  205.   rl::test_params params;
  206.   params.iteration_count = 9999999;
  207.   rl::simulate<buffer_test>(params);
  208.  
  209.   std::puts("\n\n\n_____________________________\nCompleted!");
  210.   std::getchar();
  211.   return 0;
  212. }
RAW Paste Data
Top