Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Chris M. Thomassons Poor Mans RCU... Example 123...
- #include <iostream>
- #include <atomic>
- #include <thread>
- #include <cstdlib>
- #include <cstdint>
- #include <climits>
- #include <functional>
- // Masks
- static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
- static constexpr std::uint32_t ct_ref_complete = 0x30U;
- static constexpr std::uint32_t ct_ref_inc = 0x20U;
- static constexpr std::uint32_t ct_proxy_mask = 0xFU;
- static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
- // Iteration settings
- static constexpr unsigned long ct_reader_iters_n = 1000000;
- static constexpr unsigned long ct_writer_iters_n = 100000;
- // Thread counts
- static constexpr unsigned long ct_reader_threads_n = 42;
- static constexpr unsigned long ct_writer_threads_n = 7;
- // Some debug/sanity check things...
- // Need to make this conditional in compilation with some macros...
- static std::atomic<std::uint32_t> g_debug_node_allocations(0);
- static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
- // Need to align and pad data structures! To do...
- struct ct_node
- {
- std::atomic<ct_node*> m_next;
- ct_node* m_defer_next;
- ct_node() : m_next(nullptr), m_defer_next(nullptr)
- {
- g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
- }
- ~ct_node()
- {
- g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
- }
- };
- // The proxy collector itself... :^)
- template<std::size_t T_defer_limit>
- class ct_proxy
- {
- static void prv_destroy(ct_node* n)
- {
- while (n)
- {
- ct_node* next = n->m_defer_next;
- delete n;
- n = next;
- }
- }
- public:
- class collector
- {
- friend class ct_proxy;
- private:
- std::atomic<ct_node*> m_defer;
- std::atomic<std::uint32_t> m_defer_count;
- std::atomic<std::uint32_t> m_count;
- public:
- collector()
- : m_defer(nullptr),
- m_defer_count(0),
- m_count(0)
- {
- }
- ~collector()
- {
- prv_destroy(m_defer.load(std::memory_order_relaxed));
- }
- };
- private:
- std::atomic<std::uint32_t> m_current;
- std::atomic<bool> m_quiesce;
- ct_node* m_defer;
- collector m_collectors[2];
- public:
- ct_proxy()
- : m_current(0),
- m_quiesce(false),
- m_defer(nullptr)
- {
- }
- ~ct_proxy()
- {
- prv_destroy(m_defer);
- }
- private:
- void prv_quiesce_begin()
- {
- // Try to begin the quiescence process.
- if (! m_quiesce.exchange(true, std::memory_order_acquire))
- {
- // advance the current collector and grab the old one.
- std::uint32_t old = m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
- old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
- collector& c = m_collectors[old & ct_proxy_mask];
- // decode reference count.
- std::uint32_t refs = old & ct_ref_mask;
- // increment and generate an odd reference count.
- std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);
- if (old_refs == refs)
- {
- // odd reference count and drop-to-zero condition detected!
- prv_quiesce_complete(c);
- }
- }
- }
- void prv_quiesce_complete(collector& c)
- {
- // the collector `c' is now in a quiescent state! :^)
- std::atomic_thread_fence(std::memory_order_acquire);
- // maintain the back link and obtain "fresh" objects from
- // this collection.
- ct_node* n = m_defer;
- m_defer = c.m_defer.load(std::memory_order_relaxed);
- c.m_defer.store(0, std::memory_order_relaxed);
- // reset the reference count.
- c.m_count.store(0, std::memory_order_relaxed);
- c.m_defer_count.store(0, std::memory_order_relaxed);
- // release the quiesce lock.
- m_quiesce.store(false, std::memory_order_release);
- // destroy nodes.
- prv_destroy(n);
- }
- public:
- collector& acquire()
- {
- // increment the master count _and_ obtain current collector.
- std::uint32_t current =
- m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
- // decode the collector index.
- return m_collectors[current & ct_proxy_mask];
- }
- void release(collector& c)
- {
- // decrement the collector.
- std::uint32_t count =
- c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
- // check for the completion of the quiescence process.
- if ((count & ct_ref_mask) == ct_ref_complete)
- {
- // odd reference count and drop-to-zero condition detected!
- prv_quiesce_complete(c);
- }
- }
- collector& sync(collector& c)
- {
- // check if the `c' is in the middle of a quiescence process.
- if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
- {
- // drop `c' and get the next collector.
- release(c);
- return acquire();
- }
- return c;
- }
- void collect()
- {
- prv_quiesce_begin();
- }
- void collect(collector& c, ct_node* n)
- {
- if (! n) return;
- // link node into the defer list.
- ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
- n->m_defer_next = prev;
- // bump the defer count and begin quiescence process if over
- // the limit.
- std::uint32_t count =
- c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
- if (count >= (T_defer_limit / 2))
- {
- prv_quiesce_begin();
- }
- }
- };
- typedef ct_proxy<10> ct_proxy_collector;
- // you're basic lock-free stack...
- // well, minus ABA counter and DWCAS of course! ;^)
- class ct_stack
- {
- std::atomic<ct_node*> m_head;
- public:
- ct_stack() : m_head(nullptr)
- {
- }
- public:
- void push(ct_node* n)
- {
- ct_node* head = m_head.load(std::memory_order_relaxed);
- do
- {
- n->m_next.store(head, std::memory_order_relaxed);
- }
- while (! m_head.compare_exchange_weak(
- head,
- n,
- std::memory_order_release));
- }
- ct_node* flush()
- {
- return m_head.exchange(nullptr, std::memory_order_acquire);
- }
- ct_node* get_head()
- {
- return m_head.load(std::memory_order_acquire);
- }
- ct_node* pop()
- {
- ct_node* head = m_head.load(std::memory_order_acquire);
- ct_node* xchg;
- do
- {
- if (!head) return nullptr;
- xchg = head->m_next.load(std::memory_order_relaxed);
- }
- while (! m_head.compare_exchange_weak(
- head,
- xchg,
- std::memory_order_acquire));
- return head;
- }
- };
- // The shared state
- struct ct_shared
- {
- ct_proxy<10> m_proxy_gc;
- ct_stack m_stack;
- };
- // Reader threads
- // Iterates through the lock free stack
- void ct_thread_reader(ct_shared& shared)
- {
- // iterate the lockfree stack
- for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
- {
- ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
- ct_node* n = shared.m_stack.get_head();
- while (n)
- {
- // need to add in some processing...
- //std::this_thread::yield();
- n = n->m_next.load(std::memory_order_relaxed);
- }
- shared.m_proxy_gc.release(c);
- }
- }
- // Writer threads
- // Mutates the lock free stack
- void ct_thread_writer(ct_shared& shared)
- {
- for (unsigned long wloop = 0; wloop < 42; ++wloop)
- {
- for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
- {
- shared.m_stack.push(new ct_node());
- }
- std::this_thread::yield();
- ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
- for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
- {
- shared.m_proxy_gc.collect(c, shared.m_stack.pop());
- }
- shared.m_proxy_gc.release(c);
- std::this_thread::yield();
- if ((wloop % 3) == 0)
- {
- shared.m_proxy_gc.collect();
- }
- }
- }
- int main()
- {
- std::cout << "Chris M. Thomassons Proxy Collector Port ver .0.0.1...\n";
- std::cout << "_______________________________________\n\n";
- {
- ct_shared shared;
- std::thread readers[ct_reader_threads_n];
- std::thread writers[ct_writer_threads_n];
- std::cout << "Booting threads...\n";
- for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
- {
- writers[i] = std::thread(ct_thread_writer, std::ref(shared));
- }
- for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
- {
- readers[i] = std::thread(ct_thread_reader, std::ref(shared));
- }
- std::cout << "Threads running...\n";
- for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
- {
- readers[i].join();
- }
- for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
- {
- writers[i].join();
- }
- }
- std::cout << "Threads completed!\n\n";
- // Sanity check!
- {
- std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
- std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
- std::cout << "node_allocations = " << node_allocations << "\n";
- std::cout << "node_deallocations = " << node_deallocations << "\n";
- if (node_allocations != node_deallocations)
- {
- std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " << node_allocations - node_deallocations << "\n\n";
- }
- }
- std::cout << "\n\nTest Completed!\n\n";
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement