Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include "relacy\relacy.hpp"
- #include <cstdint>
- template <typename T, size_t size = 32, size_t cache_line_size = 64> class LockFreeMPMCQueue
- {
- public:
- explicit LockFreeMPMCQueue()
- {
- m_data = new rl::var<T>[size];
- for (size_t i = 0; i < size; ++i)
- {
- m_data[i]($) = size_t(-1);
- }
- m_head_1($) = 0;
- m_head_2($) = 0;
- m_tail_1($) = 0;
- m_tail_2($) = 0;
- }
- bool try_enqueue( const T& value )
- {
- const std::uint64_t head = m_head_2($).load( rl::mo_relaxed );
- std::uint64_t tail = m_tail_1($).load( rl::mo_relaxed );
- const std::uint64_t count = tail - head;
- // count could be greater than size if between the reading of head, and the reading of tail, both head
- // and tail have been advanced
- if( count >= size )
- {
- return false;
- }
- if( !m_tail_1($).compare_exchange_strong( tail, tail + 1, rl::mo_relaxed ) )
- {
- return false;
- }
- m_data[tail % size]($) = value;
- while( m_tail_2($).load( rl::mo_relaxed ) != tail )
- {
- rl::yield(1, $);
- }
- // Release - read/write before can't be reordered with writes after
- // Make sure the write of the value to m_data is
- // not reordered past the write to m_tail_2
- rl::atomic_thread_fence( rl::mo_release, $ );
- m_tail_2($).store( tail + 1, rl::mo_relaxed );
- return true;
- }
- bool try_dequeue( T& out )
- {
- const std::uint64_t tail = m_tail_2($).load( rl::mo_relaxed );
- std::uint64_t head = m_head_1($).load( rl::mo_relaxed );
- if( head >= tail )
- {
- return false;
- }
- if( !m_head_1($).compare_exchange_strong( head, head + 1, rl::mo_relaxed ) )
- {
- return false;
- }
- // Acquire - read/write after can't be reordered with reads before
- // Make sure this read of m_data[head] is not reordered with the load
- // of m_tail_2
- rl::atomic_thread_fence( rl::mo_acquire, $ );
- out = m_data[head % size]($);
- while( m_head_2($).load( rl::mo_relaxed ) != head )
- {
- rl::yield(1, $);
- }
- // Release - read/write before can't be reordered with writes after
- // Make sure the read of value from m_data is
- // not reordered past the write to m_head_2
- rl::atomic_thread_fence( rl::mo_release, $ );
- m_head_2($).store( head + 1, rl::mo_relaxed );
- return true;
- }
- size_t capacity() const { return size; }
- private:
- rl::var<T>* m_data;
- // Make sure each index is on its own cache line
- char _pad1[cache_line_size - 8];
- rl::atomic<std::uint64_t> m_head_1;
- char _pad2[cache_line_size - 8];
- rl::atomic<std::uint64_t> m_head_2;
- char _pad3[cache_line_size - 8];
- rl::atomic<std::uint64_t> m_tail_1;
- char _pad4[cache_line_size - 8];
- rl::atomic<std::uint64_t> m_tail_2;
- };
- const size_t num_values = 32;
- const size_t num_threads = 2;
- const size_t num_values_per_thread = num_values / num_threads;
- struct queue_test : rl::test_suite<queue_test, num_threads + num_threads>
- {
- LockFreeMPMCQueue<size_t> queue;
- bool data[num_values];
- void before()
- {
- for (size_t i = 0; i < num_values; ++i)
- {
- data[i] = false;
- }
- }
- void after()
- {
- for (size_t i = 0; i < num_values; ++i)
- {
- RL_ASSERT(data[i] == true);
- }
- }
- void thread(unsigned index)
- {
- // writers
- if (index <= 1)
- {
- const size_t offset = index * num_values_per_thread;
- for (size_t i = 0; i < num_values_per_thread; ++i)
- {
- while (!queue.try_enqueue(offset + i)) { rl::yield(1, $); }
- }
- }
- // readers
- else
- {
- for (size_t i = 0; i < num_values_per_thread; ++i)
- {
- size_t value;
- while (!queue.try_dequeue(value)) { rl::yield(1, $); }
- RL_ASSERT(data[value] == false);
- data[value] = true;
- }
- }
- }
- };
- int main( int argc, char* argv[] )
- {
- rl::simulate<queue_test>();
- char c;
- scanf( "%c", &c );
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement