Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- Bounded Multi-Producer/Multi-Consumer FIFO Queue
- Copyright (C) 2011 Christopher Michael Thomasson
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- _______________________________________________________________________*/
- //#define RL_DEBUGBREAK_ON_ASSERT
- //#define RL_MSVC_OUTPUT
- //#define RL_FORCE_SEQ_CST
- //#define RL_GC
- #include <relacy/relacy_std.hpp>
- #include <cstdio>
- #define mb_relaxed std::memory_order_relaxed
- #define mb_acquire std::memory_order_acquire
- #define mb_release std::memory_order_release
- template<typename T, unsigned T_depth /* MUST BE POW OF 2 ! */>
- struct mpmc_bounded_queue
- {
- struct cell_type
- {
- std::atomic<unsigned> m_state;
- VAR_T(T) m_object;
- };
- std::atomic<unsigned> m_head;
- std::atomic<unsigned> m_tail;
- cell_type m_buffer[T_depth];
- mpmc_bounded_queue() : m_head(0), m_tail(0)
- {
- // initialize version numbers.
- for (unsigned i = 0; i < T_depth; ++i)
- {
- m_buffer[i].m_state.store(i, mb_relaxed);
- }
- }
- void push(T const& obj)
- {
- // obtain our head version and cell.
- unsigned idx = m_head.fetch_add(1, mb_relaxed);
- cell_type& cell = m_buffer[idx & (T_depth - 1U)];
- // wait for it...
- rl::linear_backoff backoff;
- while (cell.m_state.load(mb_relaxed) != idx)
- backoff.yield($);
- std::atomic_thread_fence(mb_acquire);
- // GOT IT! Okay, write to the object.
- VAR(cell.m_object) = obj;
- // We are done; allow a consumer to consume.
- std::atomic_thread_fence(mb_release);
- cell.m_state.store(idx + 1, mb_relaxed);
- }
- void pop(T& obj)
- {
- // obtain our tail version and cell.
- unsigned idx = m_tail.fetch_add(1, mb_relaxed);
- cell_type& cell = m_buffer[idx & (T_depth - 1U)];
- // wait for it...
- rl::linear_backoff backoff;
- while (cell.m_state.load(mb_relaxed) != idx + 1)
- backoff.yield($);
- std::atomic_thread_fence(mb_acquire);
- // GOT IT! Okay, read from the object.
- obj = VAR(cell.m_object);
- // We are done; allow a producer to produce.
- std::atomic_thread_fence(mb_release);
- cell.m_state.store(idx + T_depth, mb_relaxed);
- }
- };
- #define PRODUCERS 3
- #define CONSUMERS 2
- #define THREADS (PRODUCERS + CONSUMERS)
- #define ITERS 15
- #define BUFFER_SIZE (8 - CONSUMERS)
- struct mpmc_bounded_queue_test
- : rl::test_suite<mpmc_bounded_queue_test, THREADS>
- {
- mpmc_bounded_queue<unsigned, BUFFER_SIZE + CONSUMERS> g_queue;
- unsigned g_test_term_producers; // test termination only!
- unsigned g_test_term_consumers; // test termination only!
- void before()
- {
- g_test_term_producers = PRODUCERS;
- g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
- }
- void after()
- {
- RL_ASSERT(! g_test_term_consumers &&
- ! g_test_term_producers);
- }
- void thread_producer(unsigned int tidx)
- {
- for (unsigned i = 0; i < ITERS; ++i)
- {
- g_queue.push(((tidx + 1) << 8U) + i);
- }
- if (! --g_test_term_producers)
- {
- for (unsigned i = 0; i < CONSUMERS; ++i)
- {
- g_queue.push(666666);
- }
- }
- }
- void thread_consumer(unsigned int tidx)
- {
- for (;;)
- {
- unsigned v;
- g_queue.pop(v);
- --g_test_term_consumers;
- //printf("thread_consumer(%u)-popped(%u)\n", tidx, v);
- if (v == 666666)
- {
- break;
- }
- }
- }
- void thread(unsigned int tidx)
- {
- if (tidx < PRODUCERS)
- {
- thread_producer(tidx);
- }
- else
- {
- thread_consumer(tidx);
- }
- }
- };
- int main()
- {
- {
- rl::test_params p;
- p.iteration_count = 30000000;
- //p.execution_depth_limit = 10000;
- //p.search_type = rl::sched_bound;
- //p.search_type = rl::fair_full_search_scheduler_type;
- //p.search_type = rl::fair_context_bound_scheduler_type;
- rl::simulate<mpmc_bounded_queue_test>(p);
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement