Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #define RL_GC
- #include <relacy/relacy_std.hpp>
- #include <cstdio>
- class eventcount {
- public:
- typedef unsigned long key_type;
- private:
- mutable rl::atomic<key_type> m_count;
- rl::mutex m_mutex;
- rl::condition_variable m_cond;
- void prv_signal(key_type key) {
- if (key & 1) {
- m_mutex.lock($);
- while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
- rl::memory_order_seq_cst));
- m_mutex.unlock($);
- m_cond.notify_all($);
- }
- }
- public:
- eventcount() {
- m_count($).store(0, rl::memory_order_relaxed);
- }
- public:
- key_type get() const {
- return m_count($).fetch_or(1, rl::memory_order_acquire);
- }
- void signal() {
- prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
- }
- void signal_relaxed() {
- prv_signal(m_count($).load(rl::memory_order_relaxed));
- }
- void wait(key_type cmp) {
- m_mutex.lock($);
- if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
- m_cond.wait(m_mutex, $);
- }
- m_mutex.unlock($);
- }
- };
- template<typename T>
- class mpmcq {
- struct node {
- rl::atomic<node*> m_next;
- T volatile m_state;
- };
- rl::atomic<node*> m_head;
- rl::atomic<node*> m_tail;
- public:
- mpmcq() {
- node* n = RL_NEW(node);
- n->m_next($).store(NULL, rl::memory_order_relaxed);
- m_head($).store(n, rl::memory_order_relaxed);
- m_tail($).store(n, rl::memory_order_relaxed);
- }
- ~mpmcq() {
- RL_ASSERT(m_head($).load(rl::memory_order_relaxed) ==
- m_tail($).load(rl::memory_order_relaxed));
- }
- public:
- void push(T& state) {
- node* n = RL_NEW(node);
- n->m_next($).store(NULL, rl::memory_order_relaxed);
- n->m_state = state;
- node* p = m_head($).exchange(n, rl::memory_order_seq_cst);
- p->m_next($).store(n, rl::memory_order_seq_cst);
- }
- bool pop(T& state) {
- node* n;
- node* t = m_tail($).load(rl::memory_order_seq_cst);
- do {
- n = t->m_next($).load(rl::memory_order_seq_cst);
- if (! n) return false;
- state = n->m_state;
- } while (! m_tail($).compare_exchange_weak(t, n, rl::memory_order_seq_cst));
- return true;
- }
- };
- #define PRODUCERS 2
- #define CONSUMERS 2
- #define THREADS (PRODUCERS + CONSUMERS)
- #define ITERS 6
- struct mpmcq_test : rl::test_suite<mpmcq_test, THREADS> {
- eventcount m_ecount;
- mpmcq<int> m_queue;
- std::atomic<int> m_count;
- void before() {
- m_count($).store(PRODUCERS * ITERS, rl::memory_order_relaxed);
- }
- void invariant() {
- RL_ASSERT(m_count($).load(rl::memory_order_relaxed) > -1);
- }
- void after() {
- RL_ASSERT(! m_count($).load(rl::memory_order_relaxed));
- }
- void thread(unsigned tidx) {
- int i;
- if (tidx < PRODUCERS) {
- for (i = 0; i < ITERS; ++i) {
- m_queue.push(i);
- m_ecount.signal();
- }
- } else {
- do {
- while (! m_queue.pop(i)) {
- eventcount::key_type key = m_ecount.get();
- if (m_queue.pop(i)) break;
- m_ecount.wait(key);
- }
- } while (i != -666 &&
- m_count($).fetch_add(-1, rl::memory_order_relaxed) != 1);
- if (i != -666) {
- for (i = 1; i < CONSUMERS; ++i) {
- int x = -666;
- m_queue.push(x);
- }
- m_ecount.signal();
- }
- }
- }
- };
- int main() {
- rl::test_params params;
- params.iteration_count = 99999999;
- //params.search_type = rl::fair_full_search_scheduler_type;
- //params.search_type = rl::fair_context_bound_scheduler_type;
- rl::simulate<mpmcq_test>(params);
- std::puts("\n\n\n____________________________________\n"
- "DONE!!!!\npress <ENTER> to exit...");
- std::fflush(stdin);
- std::fflush(stdout);
- std::getchar();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement