#include <relacy/relacy_std.hpp>
#include <cstdio>
#include <cstddef>
#if ! defined (NDEBUG)
# define DBG_PRINTF(mp_exp) std::printf mp_exp
#else
# define DBG_PRINTF(mp_exp) ((void)0)
#endif
class eventcount {
public:
typedef unsigned long key_type;
private:
mutable rl::atomic<key_type> m_count;
rl::mutex m_mutex;
rl::condition_variable m_cond;
void prv_signal(key_type key) {
if (key & 1) {
m_mutex.lock($);
while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
rl::memory_order_seq_cst));
m_mutex.unlock($);
m_cond.notify_all($);
}
}
public:
eventcount() {
m_count($).store(0, rl::memory_order_relaxed);
}
public:
key_type get() const {
return m_count($).fetch_or(1, rl::memory_order_acquire);
}
void signal() {
prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
}
void signal_relaxed() {
prv_signal(m_count($).load(rl::memory_order_relaxed));
}
void wait(key_type cmp) {
m_mutex.lock($);
if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
m_cond.wait(m_mutex, $);
}
m_mutex.unlock($);
}
};
template<typename T, std::size_t T_size>
class mpsc_boundq {
rl::atomic<std::size_t> m_push;
rl::atomic<std::size_t> m_push_idx;
rl::atomic<bool> m_commit[T_size];
rl::var<std::size_t> m_pop_idx;
rl::var<T> m_buffer[T_size];
eventcount m_pop_ecount;
eventcount m_push_ecount;
private:
bool prv_try_push_strong() {
std::size_t push_ = m_push($).load(rl::memory_order_relaxed);
while (! m_push($).compare_exchange_weak(push_,
(push_) ? push_ - 1 : 0, rl::memory_order_seq_cst));
return (push_) ? true : false;
}
public:
mpsc_boundq() {
m_push($).store(T_size, rl::memory_order_relaxed);
m_push_idx($).store(T_size, rl::memory_order_relaxed);
m_pop_idx($) = 0;
for (std::size_t i = 0; i < T_size; ++i) {
m_buffer[i]($) = T();
m_commit[i]($).store(false, rl::memory_order_relaxed);
}
}
~mpsc_boundq() {
RL_ASSERT(m_push($).load(rl::memory_order_relaxed) == T_size);
for (std::size_t i = 0; i < T_size; ++i) {
RL_ASSERT(! m_commit[i]($).load(rl::memory_order_relaxed));
}
}
public:
void push(T const& state) {
while (! prv_try_push_strong()) {
eventcount::key_type const eckey = m_push_ecount.get();
if (prv_try_push_strong()) break;
m_push_ecount.wait(eckey);
}
std::size_t const i = m_push_idx($).fetch_add(1,
rl::memory_order_seq_cst) % T_size;
m_buffer[i]($) = state;
m_commit[i]($).store(true, rl::memory_order_seq_cst);
/* if following is: `m_pop_ecount.signal_relaxed();' == BOOM!!!!
;^D RELACY!!!
______________________________________________________________________*/
m_pop_ecount.signal();
// m_pop_ecount.signal_relaxed();
}
void pop(T& state) {
std::size_t const i = m_pop_idx($) % T_size;
++m_pop_idx($);
while (! m_commit[i]($).load(rl::memory_order_seq_cst)) {
eventcount::key_type const eckey = m_pop_ecount.get();
if (m_commit[i]($).load(rl::memory_order_seq_cst)) break;
m_pop_ecount.wait(eckey);
}
state = m_buffer[i]($);
m_commit[i]($).store(false, rl::memory_order_seq_cst);
m_push($).fetch_add(1, rl::memory_order_seq_cst);
m_push_ecount.signal_relaxed();
}
};
#define CONSUMERS 1
#define PRODUCERS 4
#define THREADS (CONSUMERS + PRODUCERS)
#define ITERS 6
#define BUFFER (PRODUCERS * ITERS)
struct buffer_test : rl::test_suite<buffer_test, THREADS> {
mpsc_boundq<unsigned, BUFFER / 2> m_buffer;
void thread(unsigned tidx) {
if (tidx < CONSUMERS) {
for (unsigned i = 0; i < BUFFER; ++i) {
m_buffer.pop(tidx);
DBG_PRINTF(("consumer loop: %u\n", tidx));
}
} else {
for (unsigned i = 0; i < ITERS * CONSUMERS; ++i) {
m_buffer.push(i);
DBG_PRINTF(("producer(%u) loop: %u\n", tidx, i));
}
}
}
};
int main() {
rl::test_params params;
params.iteration_count = 9999999;
rl::simulate<buffer_test>(params);
std::puts("\n\n\n_____________________________\nCompleted!");
std::getchar();
return 0;
}