Chris_M_Thomasson

Single Producer/Multiple Consumer Lock-Free Queue

Mar 20th, 2017
207
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.94 KB | None | 0 0
  1. #include <cstdio>
  2. #include <deque>
  3. #include <condition_variable>
  4. #include <mutex>
  5. #include <memory>
  6. #include <thread>
  7. #include <atomic>
  8. #include <algorithm>
  9. #include <cassert>
  10.  
  11.  
  12.  
  13. #define mb_relaxed std::memory_order_relaxed
  14. #define mb_consume std::memory_order_consume
  15. #define mb_acquire std::memory_order_acquire
  16. #define mb_release std::memory_order_release
  17. #define mb_acq_rel std::memory_order_acq_rel
  18. #define mb_seq_cst std::memory_order_seq_cst
  19.  
  20. #define mb_fence(mb) std::atomic_thread_fence(mb)
  21.  
  22.  
  23. // Just a check...
  24. static std::atomic<unsigned long> g_alloc_count(0);
  25.  
  26.  
  27. // simple single producer/multi-consumer queue
  28. struct node
  29. {
  30.     node* m_next;
  31.     node() : m_next(nullptr) {} // tidy...
  32. };
  33.  
  34.  
  35. struct spmc_queue
  36. {
  37.     std::atomic<node*> m_head;
  38.  
  39.     spmc_queue() : m_head(nullptr) {}
  40.  
  41.     // push a single node
  42.     void push(node* const n)
  43.     {
  44.         node* head = m_head.load(mb_relaxed);
  45.  
  46.         for (;;)
  47.         {
  48.             n->m_next = head;
  49.             mb_fence(mb_release);
  50.  
  51.             if (m_head.compare_exchange_weak(head, n, mb_relaxed))
  52.             {
  53.                 break;
  54.             }
  55.         }
  56.     }
  57.  
  58.     // try to flush all of our nodes
  59.     node* flush(node* const nhead = nullptr)
  60.     {
  61.         if (nhead) mb_fence(mb_release);
  62.  
  63.         node* n = m_head.exchange(nhead, mb_relaxed);
  64.  
  65.         if (n)
  66.         {
  67.             mb_fence(mb_acquire);
  68.         }
  69.  
  70.         return n;
  71.     }
  72. };
  73.  
  74. // Spin-Wait, Blocking Adapt Function
  75. node* spmc_queue_spin_lock_flush(
  76.     spmc_queue& queue
  77. ) {
  78.     node* n = nullptr;
  79.  
  80.     for (;;)
  81.     {
  82.         n = queue.flush();
  83.         if (n) break;
  84.         std::this_thread::yield();
  85.     }
  86.  
  87.     return n;
  88. }
  89.  
  90.  
  91. #define CONSUMERS 7
  92. #define N 10000000
  93.  
  94.  
  95. struct user_data : public node
  96. {
  97.     int m_foo;
  98.  
  99.     user_data(int foo) : m_foo(foo) {}
  100. };
  101.  
  102.  
  103. void producer_thread(
  104.     unsigned int id,
  105.     std::mutex& std_out_mutex,
  106.     spmc_queue& queue
  107. ) {
  108.     {
  109.         std::unique_lock<std::mutex> lock(std_out_mutex);
  110.         std::printf("producer(%u)::queue(%p) - Entry\n", id, (void*)&queue);
  111.     }
  112.  
  113.     for (unsigned int i = 0; i < N; ++i)
  114.     {
  115.         user_data* ud = new user_data(i + 1); // allocate memory
  116.         g_alloc_count.fetch_add(1, mb_relaxed);
  117.  
  118.         queue.push(ud);
  119.  
  120.         if (!((i + 1) % 1003))
  121.         {
  122.             std::unique_lock<std::mutex> lock(std_out_mutex);
  123.             std::printf("producer(%u)::queue(%p) - produced(%u)\n", id, (void*)&queue, i + 1);
  124.         }
  125.     }
  126.  
  127.     for (unsigned int i = 0; i < CONSUMERS; ++i)
  128.     {
  129.         user_data* ud = new user_data(0); // allocate memory
  130.         g_alloc_count.fetch_add(1, mb_relaxed);
  131.  
  132.         queue.push(ud);
  133.     }
  134.  
  135.     {
  136.         std::unique_lock<std::mutex> lock(std_out_mutex);
  137.         std::printf("producer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
  138.     }
  139. }
  140.  
  141.  
  142. void consumer_thread(
  143.     unsigned int id,
  144.     std::mutex& std_out_mutex,
  145.     spmc_queue& queue
  146. ) {
  147.     {
  148.         std::unique_lock<std::mutex> lock(std_out_mutex);
  149.         std::printf("consumer(%u)::queue(%p) - Entry\n", id, (void*)&queue);
  150.     }
  151.  
  152.     {
  153.         for (unsigned long i = 0;; ++i)
  154.         {
  155.             // Wait for something...
  156.             user_data* ud = (user_data*)spmc_queue_spin_lock_flush(queue);
  157.             assert(ud); // make sure we have something!
  158.  
  159.             int counter = 0;
  160.  
  161.             while (ud)
  162.             {
  163.                 node* ud_next = ud->m_next;
  164.  
  165.                 unsigned int foo = ud->m_foo;
  166.                 delete ud; // reclaim memory
  167.                 g_alloc_count.fetch_sub(1, mb_relaxed);
  168.  
  169.                 if (foo == 0)
  170.                 {
  171.                     // We have recieved a "stop" signal
  172.                     counter++;
  173.                 }
  174.  
  175.                 if (!((i + 1) % 1003))
  176.                 {
  177.                     std::unique_lock<std::mutex> lock(std_out_mutex);
  178.                     std::printf("consumer(%u)::queue(%p) - consumed(foo:%u)\n",
  179.                         id, (void*)&queue, foo);
  180.                 }
  181.  
  182.                 ud = (user_data*)ud_next;
  183.             }
  184.  
  185.             std::this_thread::yield(); // just for spice...
  186.  
  187.             while (counter > 1)
  188.             {
  189.                 // Replay all of the excess stop signals
  190.                 user_data* ud = new user_data(0); // allocate memory
  191.                 g_alloc_count.fetch_add(1, mb_relaxed);
  192.  
  193.                 queue.push(ud);
  194.                 --counter;
  195.  
  196.                 {
  197.                     std::unique_lock<std::mutex> lock(std_out_mutex);
  198.                     std::printf("consumer(%u)::queue(%p) - replay(%u) *****************\n",
  199.                         id, (void*)&queue, counter);
  200.                 }
  201.             }
  202.  
  203.             if (counter == 1)
  204.             {
  205.                 // We are fin!
  206.                 break;
  207.             }
  208.         }
  209.     }
  210.  
  211.     {
  212.         std::unique_lock<std::mutex> lock(std_out_mutex);
  213.         std::printf("consumer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
  214.     }
  215. }
  216.  
  217.  
  218.  
  219. int main(void)
  220. {
  221.     {
  222.         spmc_queue queue;
  223.  
  224.         std::thread consumers[CONSUMERS];
  225.         std::mutex std_out_mutex;
  226.  
  227.         for (unsigned int i = 0; i < CONSUMERS; ++i)
  228.         {
  229.             consumers[i] = std::thread(
  230.                 consumer_thread,
  231.                 i + 1,
  232.                 std::ref(std_out_mutex),
  233.                 std::ref(queue)
  234.             );
  235.         }
  236.  
  237.         std::thread producer(
  238.             producer_thread,
  239.             0,
  240.             std::ref(std_out_mutex),
  241.             std::ref(queue)
  242.         );
  243.  
  244.         producer.join();
  245.  
  246.         for (unsigned int i = 0; i < CONSUMERS; ++i)
  247.         {
  248.             consumers[i].join();
  249.         }
  250.     }
  251.  
  252.     std::printf("g_alloc_count:(%lu)\n", g_alloc_count.load(mb_relaxed));
  253.     assert(g_alloc_count.load(mb_relaxed) == 0);
  254.  
  255.     std::printf("\nComplete, hit <ENTER> to exit...\n");
  256.     std::fflush(stdout);
  257.     std::getchar();
  258.  
  259.     return 0;
  260. }
Add Comment
Please, Sign In to add comment