Advertisement
Chris_M_Thomasson

MPMC Bounded Queue

Oct 31st, 2013
723
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.37 KB | None | 0 0
  1. /*
  2. Bounded Multi-Producer/Multi-Consumer FIFO Queue
  3.  
  4. Distributed Waitset Algorithm
  5.  
  6. By Christopher Michael Thomasson,
  7.  
  8.  
  9. Based Off Of, Dmitry Vyukov's Excellent Algorithm:
  10.  
  11. http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  12.  
  13.  
  14. This program is free software: you can redistribute it and/or modify
  15. it under the terms of the GNU General Public License as published by
  16. the Free Software Foundation, either version 3 of the License, or
  17. (at your option) any later version.
  18.  
  19.  
  20. This program is distributed in the hope that it will be useful,
  21. but WITHOUT ANY WARRANTY; without even the implied warranty of
  22. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  23. GNU General Public License for more details.
  24.  
  25.  
  26. You should have received a copy of the GNU General Public License
  27. along with this program.  If not, see <http://www.gnu.org/licenses/>.
  28. _______________________________________________________________________*/
  29.  
  30.  
  31.  
  32.  
  33. //#define RL_DEBUGBREAK_ON_ASSERT
  34. //#define RL_MSVC_OUTPUT
  35. //#define RL_FORCE_SEQ_CST
  36. //#define RL_GC
  37.  
  38.  
  39. #include <relacy/relacy_std.hpp>
  40. #include <cstdio>
  41.  
  42.  
  43. #define mb_relaxed std::memory_order_relaxed
  44. #define mb_consume std::memory_order_consume
  45. #define mb_acquire std::memory_order_acquire
  46. #define mb_release std::memory_order_release
  47. #define mb_acq_rel std::memory_order_acq_rel
  48. #define mb_seq_cst std::memory_order_seq_cst
  49.  
  50.  
  51. #define mb_relaxed_fence() std::atomic_thread_fence(mb_relaxed)
  52. #define mb_consume_fence() std::atomic_thread_fence(mb_consume)
  53. #define mb_acquire_fence() std::atomic_thread_fence(mb_acquire)
  54. #define mb_release_fence() std::atomic_thread_fence(mb_release)
  55. #define mb_acq_rel_fence() std::atomic_thread_fence(mb_acq_rel)
  56. #define mb_seq_cst_fence() std::atomic_thread_fence(mb_seq_cst)
  57.  
  58.  
  59.  
  60.  
  61. class waitset
  62. {
  63.     std::mutex m_mutex;
  64.     std::condition_variable m_cond;
  65.     std::atomic<bool> m_waitbit;
  66.     VAR_T(unsigned) m_waiters;
  67.  
  68.  
  69.  
  70. public:
  71.     waitset()
  72.     :   m_waitbit(false),
  73.         m_waiters(0)
  74.     {
  75.        
  76.     }
  77.  
  78.  
  79.     ~waitset()
  80.     {
  81.         bool waitbit = m_waitbit.load(mb_relaxed);
  82.  
  83.         unsigned waiters = VAR(m_waiters);
  84.  
  85.         RL_ASSERT(! waitbit && ! waiters);
  86.     }
  87.  
  88.  
  89.  
  90. private:
  91.     void prv_signal(bool waitbit, bool broadcast)
  92.     {
  93.         if (! waitbit) return;
  94.  
  95.         m_mutex.lock($);
  96.  
  97.         unsigned waiters = VAR(m_waiters);
  98.  
  99.         if (waiters < 2 || broadcast)
  100.         {
  101.             m_waitbit.store(false, mb_relaxed);
  102.         }
  103.  
  104.         m_mutex.unlock($);
  105.  
  106.         if (waiters)
  107.         {
  108.             if (! broadcast)
  109.             {
  110.                 m_cond.notify_one($);
  111.             }
  112.  
  113.             else
  114.             {
  115.                 m_cond.notify_all($);
  116.             }
  117.         }
  118.     }
  119.  
  120.  
  121.  
  122. public:
  123.     unsigned wait_begin()
  124.     {
  125.         m_mutex.lock($);
  126.  
  127.         m_waitbit.store(true, mb_relaxed);
  128.  
  129.         mb_seq_cst_fence();
  130.  
  131.         return 0;
  132.     }
  133.  
  134.  
  135.     bool wait_try_begin(unsigned& key)
  136.     {
  137.         if (! m_mutex.try_lock($)) return false;
  138.  
  139.         m_waitbit.store(true, mb_relaxed);
  140.  
  141.         mb_seq_cst_fence();
  142.  
  143.         return true;
  144.     }
  145.  
  146.  
  147.     void wait_cancel(unsigned key)
  148.     {
  149.         unsigned waiters = VAR(m_waiters);
  150.  
  151.         if (! waiters)
  152.         {
  153.             m_waitbit.store(false, mb_relaxed);
  154.         }
  155.  
  156.         m_mutex.unlock($);
  157.     }
  158.  
  159.  
  160.     void wait_commit(unsigned key)
  161.     {
  162.         ++VAR(m_waiters);
  163.  
  164.         m_cond.wait(m_mutex, $);
  165.  
  166.         if (! --VAR(m_waiters))
  167.         {
  168.             m_waitbit.store(false, mb_relaxed);
  169.         }
  170.  
  171.         m_mutex.unlock($);
  172.     }
  173.  
  174.  
  175.  
  176. public:
  177.     void signal()
  178.     {
  179.         mb_seq_cst_fence();
  180.  
  181.         bool waitbit = m_waitbit.load(std::memory_order_relaxed);
  182.  
  183.         prv_signal(waitbit, false);
  184.     }
  185.  
  186.  
  187.     void broadcast()
  188.     {
  189.         mb_seq_cst_fence();
  190.  
  191.         bool waitbit = m_waitbit.load(std::memory_order_relaxed);
  192.  
  193.         prv_signal(waitbit, true);
  194.     }
  195. };
  196.  
  197.  
  198.  
  199.  
  200.  
  201. // T_depth && T_wdepth MUST be a power of 2!
  202. template<typename T, unsigned T_depth, unsigned T_wdepth>
  203. struct mpmc_bounded_queue
  204. {
  205.     struct cell_type
  206.     {
  207.         std::atomic<unsigned> m_state;
  208.         VAR_T(T) m_object;
  209.     };
  210.  
  211.  
  212.     std::atomic<unsigned> m_head;
  213.     std::atomic<unsigned> m_tail;
  214.     waitset m_waitset[T_wdepth];
  215.     cell_type m_buffer[T_depth];
  216.  
  217.  
  218.     mpmc_bounded_queue() : m_head(0), m_tail(0)
  219.     {
  220.         // initialize version numbers.
  221.         for (unsigned i = 0; i < T_depth; ++i)
  222.         {
  223.             m_buffer[i].m_state.store(i, mb_relaxed);
  224.         }
  225.     }
  226.  
  227.  
  228.     void push(T const& obj)
  229.     {
  230.         // obtain our head version and cell.
  231.         unsigned idx = m_head.fetch_add(1, mb_relaxed);
  232.         cell_type& cell = m_buffer[idx & (T_depth - 1U)];
  233.         waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
  234.  
  235.         // wait for it...
  236.         while (cell.m_state.load(mb_relaxed) != idx)
  237.         {
  238.             unsigned key = wset.wait_begin();
  239.  
  240.             if (cell.m_state.load(mb_relaxed) == idx)
  241.             {
  242.                 wset.wait_cancel(key);
  243.                 break;
  244.             }
  245.  
  246.             wset.wait_commit(key);
  247.         }
  248.  
  249.         mb_acquire_fence();
  250.  
  251.         // GOT IT! Okay, write to the object.
  252.         VAR(cell.m_object) = obj;
  253.  
  254.         // We are done; allow a consumer to consume.
  255.         mb_release_fence();
  256.         cell.m_state.store(idx + 1, mb_relaxed);
  257.  
  258.         wset.broadcast();
  259.     }
  260.  
  261.  
  262.     void pop(T& obj)
  263.     {
  264.         // obtain our tail version and cell.
  265.         unsigned idx = m_tail.fetch_add(1, mb_relaxed);
  266.         cell_type& cell = m_buffer[idx & (T_depth - 1U)];
  267.         waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
  268.  
  269.         // wait for it...
  270.         while (cell.m_state.load(mb_relaxed) != idx + 1U)
  271.         {
  272.             unsigned key = wset.wait_begin();
  273.  
  274.             if (cell.m_state.load(mb_relaxed) == idx + 1U)
  275.             {
  276.                 wset.wait_cancel(key);
  277.                 break;
  278.             }
  279.  
  280.             wset.wait_commit(key);
  281.         }
  282.  
  283.         mb_acquire_fence();
  284.        
  285.         // GOT IT! Okay, read from the object.
  286.         obj = VAR(cell.m_object);
  287.  
  288.         // We are done; allow a producer to produce.
  289.         mb_release_fence();
  290.         cell.m_state.store(idx + T_depth, mb_relaxed);
  291.  
  292.         wset.broadcast();
  293.     }
  294. };
  295.  
  296.  
  297.  
  298.  
  299. #define PRODUCERS 2
  300. #define CONSUMERS 3
  301. #define THREADS (PRODUCERS + CONSUMERS)
  302. #define ITERS 7
  303. #define BUFFER_SIZE (8)
  304. #define WAITSET_SIZE (4)
  305.  
  306. struct mpmc_bounded_queue_test
  307. :   rl::test_suite<mpmc_bounded_queue_test, THREADS>
  308. {
  309.     mpmc_bounded_queue<unsigned, BUFFER_SIZE, WAITSET_SIZE> g_queue;
  310.     unsigned g_test_term_producers; // test termination only!
  311.     unsigned g_test_term_consumers; // test termination only!
  312.  
  313.  
  314.     void before()
  315.     {
  316.         g_test_term_producers = PRODUCERS;
  317.         g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
  318.     }
  319.  
  320.  
  321.     void after()
  322.     {
  323.         RL_ASSERT(! g_test_term_consumers &&
  324.                   ! g_test_term_producers);
  325.     }
  326.  
  327.  
  328.     void thread_producer(unsigned int tidx)
  329.     {
  330.         for (unsigned i = 0; i < ITERS; ++i)
  331.         {
  332.             g_queue.push(((tidx + 1) << 8U) + i);
  333.         }
  334.  
  335.         if (! --g_test_term_producers)
  336.         {
  337.             for (unsigned i = 0; i < CONSUMERS; ++i)
  338.             {
  339.                 g_queue.push(666666);
  340.             }
  341.         }
  342.     }
  343.  
  344.  
  345.     void thread_consumer(unsigned int tidx)
  346.     {
  347.         for (;;)
  348.         {
  349.             unsigned v;
  350.             g_queue.pop(v);
  351.             --g_test_term_consumers;
  352.  
  353.             if (v == 666666)
  354.             {
  355.                 break;
  356.             }
  357.         }
  358.     }
  359.  
  360.  
  361.     void thread(unsigned int tidx)
  362.     {  
  363.         if (tidx < PRODUCERS)
  364.         {
  365.             thread_producer(tidx);
  366.         }
  367.  
  368.         else
  369.         {
  370.             thread_consumer(tidx);
  371.         }
  372.     }
  373. };
  374.  
  375.  
  376.  
  377.  
  378. int main()
  379. {
  380.     {
  381.         rl::test_params p;
  382.  
  383.        // p.iteration_count = 1000000;
  384.         //p.execution_depth_limit = 33333;
  385.         //p.search_type = rl::sched_bound;
  386.         //p.search_type = rl::fair_full_search_scheduler_type;
  387.         p.search_type = rl::fair_context_bound_scheduler_type;
  388.  
  389.         rl::simulate<mpmc_bounded_queue_test>(p);
  390.     }
  391.  
  392.     std::puts("\nTest Complete!\n");
  393.  
  394.     std::getchar();
  395.  
  396.     return 0;
  397. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement