Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- Bounded Multi-Producer/Multi-Consumer FIFO Queue
- Distributed Waitset Algorithm
- By Christopher Michael Thomasson,
- Based Off Of, Dmitry Vyukov's Excellent Algorithm:
- http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- _______________________________________________________________________*/
- //#define RL_DEBUGBREAK_ON_ASSERT
- //#define RL_MSVC_OUTPUT
- //#define RL_FORCE_SEQ_CST
- //#define RL_GC
- #include <relacy/relacy_std.hpp>
- #include <cstdio>
- #define mb_relaxed std::memory_order_relaxed
- #define mb_consume std::memory_order_consume
- #define mb_acquire std::memory_order_acquire
- #define mb_release std::memory_order_release
- #define mb_acq_rel std::memory_order_acq_rel
- #define mb_seq_cst std::memory_order_seq_cst
- #define mb_relaxed_fence() std::atomic_thread_fence(mb_relaxed)
- #define mb_consume_fence() std::atomic_thread_fence(mb_consume)
- #define mb_acquire_fence() std::atomic_thread_fence(mb_acquire)
- #define mb_release_fence() std::atomic_thread_fence(mb_release)
- #define mb_acq_rel_fence() std::atomic_thread_fence(mb_acq_rel)
- #define mb_seq_cst_fence() std::atomic_thread_fence(mb_seq_cst)
- class waitset
- {
- std::mutex m_mutex;
- std::condition_variable m_cond;
- std::atomic<bool> m_waitbit;
- VAR_T(unsigned) m_waiters;
- public:
- waitset()
- : m_waitbit(false),
- m_waiters(0)
- {
- }
- ~waitset()
- {
- bool waitbit = m_waitbit.load(mb_relaxed);
- unsigned waiters = VAR(m_waiters);
- RL_ASSERT(! waitbit && ! waiters);
- }
- private:
- void prv_signal(bool waitbit, bool broadcast)
- {
- if (! waitbit) return;
- m_mutex.lock($);
- unsigned waiters = VAR(m_waiters);
- if (waiters < 2 || broadcast)
- {
- m_waitbit.store(false, mb_relaxed);
- }
- m_mutex.unlock($);
- if (waiters)
- {
- if (! broadcast)
- {
- m_cond.notify_one($);
- }
- else
- {
- m_cond.notify_all($);
- }
- }
- }
- public:
- unsigned wait_begin()
- {
- m_mutex.lock($);
- m_waitbit.store(true, mb_relaxed);
- mb_seq_cst_fence();
- return 0;
- }
- bool wait_try_begin(unsigned& key)
- {
- if (! m_mutex.try_lock($)) return false;
- m_waitbit.store(true, mb_relaxed);
- mb_seq_cst_fence();
- return true;
- }
- void wait_cancel(unsigned key)
- {
- unsigned waiters = VAR(m_waiters);
- if (! waiters)
- {
- m_waitbit.store(false, mb_relaxed);
- }
- m_mutex.unlock($);
- }
- void wait_commit(unsigned key)
- {
- ++VAR(m_waiters);
- m_cond.wait(m_mutex, $);
- if (! --VAR(m_waiters))
- {
- m_waitbit.store(false, mb_relaxed);
- }
- m_mutex.unlock($);
- }
- public:
- void signal()
- {
- mb_seq_cst_fence();
- bool waitbit = m_waitbit.load(std::memory_order_relaxed);
- prv_signal(waitbit, false);
- }
- void broadcast()
- {
- mb_seq_cst_fence();
- bool waitbit = m_waitbit.load(std::memory_order_relaxed);
- prv_signal(waitbit, true);
- }
- };
- // T_depth && T_wdepth MUST be a power of 2!
- template<typename T, unsigned T_depth, unsigned T_wdepth>
- struct mpmc_bounded_queue
- {
- struct cell_type
- {
- std::atomic<unsigned> m_state;
- VAR_T(T) m_object;
- };
- std::atomic<unsigned> m_head;
- std::atomic<unsigned> m_tail;
- waitset m_waitset[T_wdepth];
- cell_type m_buffer[T_depth];
- mpmc_bounded_queue() : m_head(0), m_tail(0)
- {
- // initialize version numbers.
- for (unsigned i = 0; i < T_depth; ++i)
- {
- m_buffer[i].m_state.store(i, mb_relaxed);
- }
- }
- void push(T const& obj)
- {
- // obtain our head version and cell.
- unsigned idx = m_head.fetch_add(1, mb_relaxed);
- cell_type& cell = m_buffer[idx & (T_depth - 1U)];
- waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
- // wait for it...
- while (cell.m_state.load(mb_relaxed) != idx)
- {
- unsigned key = wset.wait_begin();
- if (cell.m_state.load(mb_relaxed) == idx)
- {
- wset.wait_cancel(key);
- break;
- }
- wset.wait_commit(key);
- }
- mb_acquire_fence();
- // GOT IT! Okay, write to the object.
- VAR(cell.m_object) = obj;
- // We are done; allow a consumer to consume.
- mb_release_fence();
- cell.m_state.store(idx + 1, mb_relaxed);
- wset.broadcast();
- }
- void pop(T& obj)
- {
- // obtain our tail version and cell.
- unsigned idx = m_tail.fetch_add(1, mb_relaxed);
- cell_type& cell = m_buffer[idx & (T_depth - 1U)];
- waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
- // wait for it...
- while (cell.m_state.load(mb_relaxed) != idx + 1U)
- {
- unsigned key = wset.wait_begin();
- if (cell.m_state.load(mb_relaxed) == idx + 1U)
- {
- wset.wait_cancel(key);
- break;
- }
- wset.wait_commit(key);
- }
- mb_acquire_fence();
- // GOT IT! Okay, read from the object.
- obj = VAR(cell.m_object);
- // We are done; allow a producer to produce.
- mb_release_fence();
- cell.m_state.store(idx + T_depth, mb_relaxed);
- wset.broadcast();
- }
- };
- #define PRODUCERS 2
- #define CONSUMERS 3
- #define THREADS (PRODUCERS + CONSUMERS)
- #define ITERS 7
- #define BUFFER_SIZE (8)
- #define WAITSET_SIZE (4)
- struct mpmc_bounded_queue_test
- : rl::test_suite<mpmc_bounded_queue_test, THREADS>
- {
- mpmc_bounded_queue<unsigned, BUFFER_SIZE, WAITSET_SIZE> g_queue;
- unsigned g_test_term_producers; // test termination only!
- unsigned g_test_term_consumers; // test termination only!
- void before()
- {
- g_test_term_producers = PRODUCERS;
- g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
- }
- void after()
- {
- RL_ASSERT(! g_test_term_consumers &&
- ! g_test_term_producers);
- }
- void thread_producer(unsigned int tidx)
- {
- for (unsigned i = 0; i < ITERS; ++i)
- {
- g_queue.push(((tidx + 1) << 8U) + i);
- }
- if (! --g_test_term_producers)
- {
- for (unsigned i = 0; i < CONSUMERS; ++i)
- {
- g_queue.push(666666);
- }
- }
- }
- void thread_consumer(unsigned int tidx)
- {
- for (;;)
- {
- unsigned v;
- g_queue.pop(v);
- --g_test_term_consumers;
- if (v == 666666)
- {
- break;
- }
- }
- }
- void thread(unsigned int tidx)
- {
- if (tidx < PRODUCERS)
- {
- thread_producer(tidx);
- }
- else
- {
- thread_consumer(tidx);
- }
- }
- };
- int main()
- {
- {
- rl::test_params p;
- // p.iteration_count = 1000000;
- //p.execution_depth_limit = 33333;
- //p.search_type = rl::sched_bound;
- //p.search_type = rl::fair_full_search_scheduler_type;
- p.search_type = rl::fair_context_bound_scheduler_type;
- rl::simulate<mpmc_bounded_queue_test>(p);
- }
- std::puts("\nTest Complete!\n");
- std::getchar();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement