Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <cstdio>
- #include <deque>
- #include <condition_variable>
- #include <mutex>
- #include <memory>
- #include <thread>
- #include <atomic>
- #include <algorithm>
- #include <cassert>
- #define mb_relaxed std::memory_order_relaxed
- #define mb_consume std::memory_order_consume
- #define mb_acquire std::memory_order_acquire
- #define mb_release std::memory_order_release
- #define mb_acq_rel std::memory_order_acq_rel
- #define mb_seq_cst std::memory_order_seq_cst
- #define mb_fence(mb) std::atomic_thread_fence(mb)
- // Just a check...
- static std::atomic<unsigned long> g_alloc_count(0);
- // simple single producer/multi-consumer queue
- struct node
- {
- node* m_next;
- node() : m_next(nullptr) {} // tidy...
- };
- struct spmc_queue
- {
- std::atomic<node*> m_head;
- spmc_queue() : m_head(nullptr) {}
- // push a single node
- void push(node* const n)
- {
- node* head = m_head.load(mb_relaxed);
- for (;;)
- {
- n->m_next = head;
- mb_fence(mb_release);
- if (m_head.compare_exchange_weak(head, n, mb_relaxed))
- {
- break;
- }
- }
- }
- // try to flush all of our nodes
- node* flush(node* const nhead = nullptr)
- {
- if (nhead) mb_fence(mb_release);
- node* n = m_head.exchange(nhead, mb_relaxed);
- if (n)
- {
- mb_fence(mb_acquire);
- }
- return n;
- }
- };
- // Spin-Wait, Blocking Adapt Function
- node* spmc_queue_spin_lock_flush(
- spmc_queue& queue
- ) {
- node* n = nullptr;
- for (;;)
- {
- n = queue.flush();
- if (n) break;
- std::this_thread::yield();
- }
- return n;
- }
- #define CONSUMERS 7
- #define N 10000000
- struct user_data : public node
- {
- int m_foo;
- user_data(int foo) : m_foo(foo) {}
- };
- void producer_thread(
- unsigned int id,
- std::mutex& std_out_mutex,
- spmc_queue& queue
- ) {
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("producer(%u)::queue(%p) - Entry\n", id, (void*)&queue);
- }
- for (unsigned int i = 0; i < N; ++i)
- {
- user_data* ud = new user_data(i + 1); // allocate memory
- g_alloc_count.fetch_add(1, mb_relaxed);
- queue.push(ud);
- if (!((i + 1) % 1003))
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("producer(%u)::queue(%p) - produced(%u)\n", id, (void*)&queue, i + 1);
- }
- }
- for (unsigned int i = 0; i < CONSUMERS; ++i)
- {
- user_data* ud = new user_data(0); // allocate memory
- g_alloc_count.fetch_add(1, mb_relaxed);
- queue.push(ud);
- }
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("producer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
- }
- }
- void consumer_thread(
- unsigned int id,
- std::mutex& std_out_mutex,
- spmc_queue& queue
- ) {
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("consumer(%u)::queue(%p) - Entry\n", id, (void*)&queue);
- }
- {
- for (unsigned long i = 0;; ++i)
- {
- // Wait for something...
- user_data* ud = (user_data*)spmc_queue_spin_lock_flush(queue);
- assert(ud); // make sure we have something!
- int counter = 0;
- while (ud)
- {
- node* ud_next = ud->m_next;
- unsigned int foo = ud->m_foo;
- delete ud; // reclaim memory
- g_alloc_count.fetch_sub(1, mb_relaxed);
- if (foo == 0)
- {
- // We have recieved a "stop" signal
- counter++;
- }
- if (!((i + 1) % 1003))
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("consumer(%u)::queue(%p) - consumed(foo:%u)\n",
- id, (void*)&queue, foo);
- }
- ud = (user_data*)ud_next;
- }
- std::this_thread::yield(); // just for spice...
- while (counter > 1)
- {
- // Replay all of the excess stop signals
- user_data* ud = new user_data(0); // allocate memory
- g_alloc_count.fetch_add(1, mb_relaxed);
- queue.push(ud);
- --counter;
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("consumer(%u)::queue(%p) - replay(%u) *****************\n",
- id, (void*)&queue, counter);
- }
- }
- if (counter == 1)
- {
- // We are fin!
- break;
- }
- }
- }
- {
- std::unique_lock<std::mutex> lock(std_out_mutex);
- std::printf("consumer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
- }
- }
- int main(void)
- {
- {
- spmc_queue queue;
- std::thread consumers[CONSUMERS];
- std::mutex std_out_mutex;
- for (unsigned int i = 0; i < CONSUMERS; ++i)
- {
- consumers[i] = std::thread(
- consumer_thread,
- i + 1,
- std::ref(std_out_mutex),
- std::ref(queue)
- );
- }
- std::thread producer(
- producer_thread,
- 0,
- std::ref(std_out_mutex),
- std::ref(queue)
- );
- producer.join();
- for (unsigned int i = 0; i < CONSUMERS; ++i)
- {
- consumers[i].join();
- }
- }
- std::printf("g_alloc_count:(%lu)\n", g_alloc_count.load(mb_relaxed));
- assert(g_alloc_count.load(mb_relaxed) == 0);
- std::printf("\nComplete, hit <ENTER> to exit...\n");
- std::fflush(stdout);
- std::getchar();
- return 0;
- }
Add Comment
Please, Sign In to add comment