Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <relacy/relacy_std.hpp>
- #include <cstdio>
- #if ! defined (NDEBUG)
- # define DBG_PRINTF(mp_exp) std::printf mp_exp
- #else
- # define DBG_PRINTF(mp_exp) ((void)0)
- #endif
- class semaphore
- {
- rl::atomic<long> m_count;
- rl::sem_t m_waitset;
- public:
- semaphore(long count = 0)
- : m_count(count)
- {
- RL_ASSERT(count > -1);
- rl::sem_init(&m_waitset, 0, 0, $);
- }
- ~semaphore()
- {
- rl::sem_destroy(&m_waitset, $);
- }
- public:
- void post()
- {
- if (m_count($).fetch_add(1) < 0)
- {
- rl::sem_post(&m_waitset, $);
- }
- }
- void wait()
- {
- if (m_count($).fetch_add(-1) < 1)
- {
- while (rl::sem_wait(&m_waitset, $));
- }
- }
- };
- template<typename T, std::size_t T_depth>
- class mpmcq
- {
- rl::atomic<T*> m_slots[T_depth];
- rl::atomic<std::size_t> m_push_idx;
- rl::atomic<std::size_t> m_pop_idx;
- semaphore m_push_sem;
- semaphore m_pop_sem;
- public:
- mpmcq()
- : m_push_idx(T_depth),
- m_pop_idx(0),
- m_push_sem(T_depth)
- {
- for (std::size_t i = 0; i < T_depth; ++i)
- {
- m_slots[i]($).store(NULL);
- }
- }
- public:
- void push(T* ptr)
- {
- m_push_sem.wait();
- std::size_t idx = m_push_idx($).fetch_add(1) & (T_depth - 1);
- T* cmp = NULL;
- rl::backoff backoff;
- while (m_slots[idx]($).load())
- {
- backoff.yield($);
- }
- RL_ASSERT(! m_slots[idx]($).load());
- m_slots[idx]($).store(ptr);
- m_pop_sem.post();
- }
- T* pop()
- {
- m_pop_sem.wait();
- std::size_t idx = m_pop_idx($).fetch_add(1) & (T_depth - 1);
- T* ptr;
- rl::backoff backoff;
- while (! (ptr = m_slots[idx]($).load()))
- {
- backoff.yield($);
- }
- m_slots[idx]($).store(NULL);
- m_push_sem.post();
- return ptr;
- }
- };
- #define PRODUCERS 5
- #define CONSUMERS 5
- #define THREADS (PRODUCERS + CONSUMERS)
- #define ITERS 13
- #define DEPTH 8
- struct mpmcq_test : rl::test_suite<mpmcq_test, THREADS>
- {
- struct node
- {
- unsigned m_tidx;
- };
- mpmcq<node, DEPTH> m_queue;
- void thread(unsigned tidx)
- {
- node* n;
- unsigned i;
- if (tidx < PRODUCERS)
- {
- for (i = 0; i < ITERS; ++i)
- {
- n = RL_NEW node;
- n->m_tidx = tidx;
- m_queue.push(n);
- DBG_PRINTF(("thread %u pushed %p\n",
- tidx, (void*)n));
- }
- }
- else
- {
- for (i = 0; i < ITERS; ++i)
- {
- n = m_queue.pop();
- unsigned tidx_ = n->m_tidx;
- RL_ASSERT(tidx != tidx_);
- DBG_PRINTF(("thread %u popped %p from thread %u\n",
- tidx, (void*)n, tidx_));
- RL_DELETE(n);
- }
- }
- }
- };
- int main()
- {
- rl::test_params params;
- params.iteration_count = 99999999;
- //params.search_type = rl::fair_full_search_scheduler_type;
- //params.search_type = rl::fair_context_bound_scheduler_type;
- rl::simulate<mpmcq_test>(params);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement