#include #include #include #if ! defined (NDEBUG) # define DBG_PRINTF(mp_exp) std::printf mp_exp #else # define DBG_PRINTF(mp_exp) ((void)0) #endif class eventcount { public: typedef unsigned long key_type; private: mutable rl::atomic m_count; rl::mutex m_mutex; rl::condition_variable m_cond; void prv_signal(key_type key) { if (key & 1) { m_mutex.lock($); while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1, rl::memory_order_seq_cst)); m_mutex.unlock($); m_cond.notify_all($); } } public: eventcount() { m_count($).store(0, rl::memory_order_relaxed); } public: key_type get() const { return m_count($).fetch_or(1, rl::memory_order_acquire); } void signal() { prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst)); } void signal_relaxed() { prv_signal(m_count($).load(rl::memory_order_relaxed)); } void wait(key_type cmp) { m_mutex.lock($); if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) { m_cond.wait(m_mutex, $); } m_mutex.unlock($); } }; class rwmutex { enum constant { READ_ACCESS = 0x10000, WRITE_ACCESS = 1 }; rl::atomic m_next; rl::atomic m_current; eventcount m_waitset; private: bool prv_check_read(long ticket) { return (ticket == (m_current($).load(rl::memory_order_seq_cst) % READ_ACCESS)); } bool prv_check_write(long ticket) { return (ticket == m_current($).load(rl::memory_order_seq_cst)); } public: rwmutex() { m_next($).store(0, rl::memory_order_relaxed); m_current($).store(0, rl::memory_order_relaxed); } public: void rdlock() { long ticket = m_next($).fetch_add(READ_ACCESS, rl::memory_order_seq_cst) % READ_ACCESS; while (! prv_check_read(ticket)) { eventcount::key_type eckey = m_waitset.get(); if (prv_check_read(ticket)) return; m_waitset.wait(eckey); } } void rdunlock() { m_current($).fetch_add(READ_ACCESS, rl::memory_order_seq_cst); //m_waitset.signal_relaxed(); m_waitset.signal(); } public: void wrlock() { long ticket = m_next($).fetch_add(WRITE_ACCESS, rl::memory_order_seq_cst); while (! prv_check_write(ticket)) { eventcount::key_type eckey = m_waitset.get(); if (prv_check_write(ticket)) return; m_waitset.wait(eckey); } } void wrunlock() { m_current($).fetch_add(WRITE_ACCESS, rl::memory_order_seq_cst); //m_waitset.signal_relaxed(); m_waitset.signal(); } }; #define WRITERS 2 #define READERS 4 #define THREADS (WRITERS + READERS) #define ITERS 6 struct rwmutex_test : rl::test_suite { rwmutex g_rwmutex; rl::var g_state; rl::var g_rdtowr; void before() { g_state($) = 0; g_rdtowr($) = 0; } void after() { RL_ASSERT(g_state($) + g_rdtowr($) == (WRITERS * ITERS) * 2); } void invariant() { RL_ASSERT(g_state($) <= (WRITERS * ITERS) * 2); RL_ASSERT(g_rdtowr($) <= (READERS * ITERS) * 2); } void thread(unsigned tidx) { rl::backoff b; for (unsigned i = 0; i < ITERS; ++i) { if (tidx < WRITERS) { g_rwmutex.wrlock(); ++g_state($); b.yield($); RL_ASSERT(g_state($) % 2); ++g_state($); g_rwmutex.wrunlock(); } else { g_rwmutex.rdlock(); RL_ASSERT(! (g_state($) % 2)); RL_ASSERT(! (g_rdtowr($) % 2)); g_rwmutex.rdunlock(); } } } }; int main() { rl::test_params params; params.iteration_count = 9999999; rl::simulate(params); std::puts("\n\n\n____________________________________\n" "DONE!!!!\npress to exit..."); std::fflush(stdin); std::fflush(stdout); std::getchar(); return 0; }