/* Bounded Multi-Producer/Multi-Consumer FIFO Queue Distributed Waitset Algorithm By Christopher Michael Thomasson, Based Off Of, Dmitry Vyukov's Excellent Algorithm: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . _______________________________________________________________________*/ //#define RL_DEBUGBREAK_ON_ASSERT //#define RL_MSVC_OUTPUT //#define RL_FORCE_SEQ_CST //#define RL_GC #include #include #define mb_relaxed std::memory_order_relaxed #define mb_consume std::memory_order_consume #define mb_acquire std::memory_order_acquire #define mb_release std::memory_order_release #define mb_acq_rel std::memory_order_acq_rel #define mb_seq_cst std::memory_order_seq_cst #define mb_relaxed_fence() std::atomic_thread_fence(mb_relaxed) #define mb_consume_fence() std::atomic_thread_fence(mb_consume) #define mb_acquire_fence() std::atomic_thread_fence(mb_acquire) #define mb_release_fence() std::atomic_thread_fence(mb_release) #define mb_acq_rel_fence() std::atomic_thread_fence(mb_acq_rel) #define mb_seq_cst_fence() std::atomic_thread_fence(mb_seq_cst) class waitset { std::mutex m_mutex; std::condition_variable m_cond; std::atomic m_waitbit; VAR_T(unsigned) m_waiters; public: waitset() : m_waitbit(false), m_waiters(0) { } ~waitset() { bool waitbit = m_waitbit.load(mb_relaxed); unsigned waiters = VAR(m_waiters); RL_ASSERT(! waitbit && ! waiters); } private: void prv_signal(bool waitbit, bool broadcast) { if (! waitbit) return; m_mutex.lock($); unsigned waiters = VAR(m_waiters); if (waiters < 2 || broadcast) { m_waitbit.store(false, mb_relaxed); } m_mutex.unlock($); if (waiters) { if (! broadcast) { m_cond.notify_one($); } else { m_cond.notify_all($); } } } public: unsigned wait_begin() { m_mutex.lock($); m_waitbit.store(true, mb_relaxed); mb_seq_cst_fence(); return 0; } bool wait_try_begin(unsigned& key) { if (! m_mutex.try_lock($)) return false; m_waitbit.store(true, mb_relaxed); mb_seq_cst_fence(); return true; } void wait_cancel(unsigned key) { unsigned waiters = VAR(m_waiters); if (! waiters) { m_waitbit.store(false, mb_relaxed); } m_mutex.unlock($); } void wait_commit(unsigned key) { ++VAR(m_waiters); m_cond.wait(m_mutex, $); if (! --VAR(m_waiters)) { m_waitbit.store(false, mb_relaxed); } m_mutex.unlock($); } public: void signal() { mb_seq_cst_fence(); bool waitbit = m_waitbit.load(std::memory_order_relaxed); prv_signal(waitbit, false); } void broadcast() { mb_seq_cst_fence(); bool waitbit = m_waitbit.load(std::memory_order_relaxed); prv_signal(waitbit, true); } }; // T_depth && T_wdepth MUST be a power of 2! template struct mpmc_bounded_queue { struct cell_type { std::atomic m_state; VAR_T(T) m_object; }; std::atomic m_head; std::atomic m_tail; waitset m_waitset[T_wdepth]; cell_type m_buffer[T_depth]; mpmc_bounded_queue() : m_head(0), m_tail(0) { // initialize version numbers. for (unsigned i = 0; i < T_depth; ++i) { m_buffer[i].m_state.store(i, mb_relaxed); } } void push(T const& obj) { // obtain our head version and cell. unsigned idx = m_head.fetch_add(1, mb_relaxed); cell_type& cell = m_buffer[idx & (T_depth - 1U)]; waitset& wset = m_waitset[idx & (T_wdepth - 1U)]; // wait for it... while (cell.m_state.load(mb_relaxed) != idx) { unsigned key = wset.wait_begin(); if (cell.m_state.load(mb_relaxed) == idx) { wset.wait_cancel(key); break; } wset.wait_commit(key); } mb_acquire_fence(); // GOT IT! Okay, write to the object. VAR(cell.m_object) = obj; // We are done; allow a consumer to consume. mb_release_fence(); cell.m_state.store(idx + 1, mb_relaxed); wset.broadcast(); } void pop(T& obj) { // obtain our tail version and cell. unsigned idx = m_tail.fetch_add(1, mb_relaxed); cell_type& cell = m_buffer[idx & (T_depth - 1U)]; waitset& wset = m_waitset[idx & (T_wdepth - 1U)]; // wait for it... while (cell.m_state.load(mb_relaxed) != idx + 1U) { unsigned key = wset.wait_begin(); if (cell.m_state.load(mb_relaxed) == idx + 1U) { wset.wait_cancel(key); break; } wset.wait_commit(key); } mb_acquire_fence(); // GOT IT! Okay, read from the object. obj = VAR(cell.m_object); // We are done; allow a producer to produce. mb_release_fence(); cell.m_state.store(idx + T_depth, mb_relaxed); wset.broadcast(); } }; #define PRODUCERS 2 #define CONSUMERS 3 #define THREADS (PRODUCERS + CONSUMERS) #define ITERS 7 #define BUFFER_SIZE (8) #define WAITSET_SIZE (4) struct mpmc_bounded_queue_test : rl::test_suite { mpmc_bounded_queue g_queue; unsigned g_test_term_producers; // test termination only! unsigned g_test_term_consumers; // test termination only! void before() { g_test_term_producers = PRODUCERS; g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS; } void after() { RL_ASSERT(! g_test_term_consumers && ! g_test_term_producers); } void thread_producer(unsigned int tidx) { for (unsigned i = 0; i < ITERS; ++i) { g_queue.push(((tidx + 1) << 8U) + i); } if (! --g_test_term_producers) { for (unsigned i = 0; i < CONSUMERS; ++i) { g_queue.push(666666); } } } void thread_consumer(unsigned int tidx) { for (;;) { unsigned v; g_queue.pop(v); --g_test_term_consumers; if (v == 666666) { break; } } } void thread(unsigned int tidx) { if (tidx < PRODUCERS) { thread_producer(tidx); } else { thread_consumer(tidx); } } }; int main() { { rl::test_params p; // p.iteration_count = 1000000; //p.execution_depth_limit = 33333; //p.search_type = rl::sched_bound; //p.search_type = rl::fair_full_search_scheduler_type; p.search_type = rl::fair_context_bound_scheduler_type; rl::simulate(p); } std::puts("\nTest Complete!\n"); std::getchar(); return 0; }