Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <relacy/relacy_std.hpp>
- #include <cstdio>
- #include <cstddef>
- #if ! defined (NDEBUG)
- # define DBG_PRINTF(mp_exp) std::printf mp_exp
- #else
- # define DBG_PRINTF(mp_exp) ((void)0)
- #endif
- class eventcount {
- public:
- typedef unsigned long key_type;
- private:
- mutable rl::atomic<key_type> m_count;
- rl::mutex m_mutex;
- rl::condition_variable m_cond;
- void prv_signal(key_type key) {
- if (key & 1) {
- m_mutex.lock($);
- while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
- rl::memory_order_seq_cst));
- m_mutex.unlock($);
- m_cond.notify_all($);
- }
- }
- public:
- eventcount() {
- m_count($).store(0, rl::memory_order_relaxed);
- }
- public:
- key_type get() const {
- return m_count($).fetch_or(1, rl::memory_order_acquire);
- }
- void signal() {
- prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
- }
- void signal_relaxed() {
- prv_signal(m_count($).load(rl::memory_order_relaxed));
- }
- void wait(key_type cmp) {
- m_mutex.lock($);
- if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
- m_cond.wait(m_mutex, $);
- }
- m_mutex.unlock($);
- }
- };
- template<typename T, std::size_t T_size>
- class mpsc_boundq {
- rl::atomic<std::size_t> m_push;
- rl::atomic<std::size_t> m_push_idx;
- rl::atomic<bool> m_commit[T_size];
- rl::var<std::size_t> m_pop_idx;
- rl::var<T> m_buffer[T_size];
- eventcount m_pop_ecount;
- eventcount m_push_ecount;
- private:
- bool prv_try_push_strong() {
- std::size_t push_ = m_push($).load(rl::memory_order_relaxed);
- while (! m_push($).compare_exchange_weak(push_,
- (push_) ? push_ - 1 : 0, rl::memory_order_seq_cst));
- return (push_) ? true : false;
- }
- public:
- mpsc_boundq() {
- m_push($).store(T_size, rl::memory_order_relaxed);
- m_push_idx($).store(T_size, rl::memory_order_relaxed);
- m_pop_idx($) = 0;
- for (std::size_t i = 0; i < T_size; ++i) {
- m_buffer[i]($) = T();
- m_commit[i]($).store(false, rl::memory_order_relaxed);
- }
- }
- ~mpsc_boundq() {
- RL_ASSERT(m_push($).load(rl::memory_order_relaxed) == T_size);
- for (std::size_t i = 0; i < T_size; ++i) {
- RL_ASSERT(! m_commit[i]($).load(rl::memory_order_relaxed));
- }
- }
- public:
- void push(T const& state) {
- while (! prv_try_push_strong()) {
- eventcount::key_type const eckey = m_push_ecount.get();
- if (prv_try_push_strong()) break;
- m_push_ecount.wait(eckey);
- }
- std::size_t const i = m_push_idx($).fetch_add(1,
- rl::memory_order_seq_cst) % T_size;
- m_buffer[i]($) = state;
- m_commit[i]($).store(true, rl::memory_order_seq_cst);
- /* if following is: `m_pop_ecount.signal_relaxed();' == BOOM!!!!
- ;^D RELACY!!!
- ______________________________________________________________________*/
- m_pop_ecount.signal();
- // m_pop_ecount.signal_relaxed();
- }
- void pop(T& state) {
- std::size_t const i = m_pop_idx($) % T_size;
- ++m_pop_idx($);
- while (! m_commit[i]($).load(rl::memory_order_seq_cst)) {
- eventcount::key_type const eckey = m_pop_ecount.get();
- if (m_commit[i]($).load(rl::memory_order_seq_cst)) break;
- m_pop_ecount.wait(eckey);
- }
- state = m_buffer[i]($);
- m_commit[i]($).store(false, rl::memory_order_seq_cst);
- m_push($).fetch_add(1, rl::memory_order_seq_cst);
- m_push_ecount.signal_relaxed();
- }
- };
- #define CONSUMERS 1
- #define PRODUCERS 4
- #define THREADS (CONSUMERS + PRODUCERS)
- #define ITERS 6
- #define BUFFER (PRODUCERS * ITERS)
- struct buffer_test : rl::test_suite<buffer_test, THREADS> {
- mpsc_boundq<unsigned, BUFFER / 2> m_buffer;
- void thread(unsigned tidx) {
- if (tidx < CONSUMERS) {
- for (unsigned i = 0; i < BUFFER; ++i) {
- m_buffer.pop(tidx);
- DBG_PRINTF(("consumer loop: %u\n", tidx));
- }
- } else {
- for (unsigned i = 0; i < ITERS * CONSUMERS; ++i) {
- m_buffer.push(i);
- DBG_PRINTF(("producer(%u) loop: %u\n", tidx, i));
- }
- }
- }
- };
- int main() {
- rl::test_params params;
- params.iteration_count = 9999999;
- rl::simulate<buffer_test>(params);
- std::puts("\n\n\n_____________________________\nCompleted!");
- std::getchar();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement