Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* Simple Proxy Collector (DWCAS) - C++11 Version
- http://www.1024cores.net/home/relacy-race-detector
- Copyright 3/27/2018
- ___________________________________________________*/
- #if defined (_MSC_VER)
- # define _ENABLE_ATOMIC_ALIGNMENT_FIX // for dwcas
- #endif
- #include <atomic>
- #include <mutex>
- #include <cstdio>
- #include <cstdlib>
- #include <cstdint>
- #include <cassert>
- static std::atomic<unsigned int> g_allocs(0);
- /* Utilities
- ___________________________________________________*/
- void ct_cout(
- const char* msg
- ) {
- static std::mutex g_cout_lock;
- g_cout_lock.lock();
- std::printf(msg);
- std::fflush(stdout);
- g_cout_lock.unlock();
- }
- /* Member Fence Abstraction
- ___________________________________________________*/
- #define ct_mb_relaxed std::memory_order_relaxed
- #define ct_mb_acquire std::memory_order_acquire
- #define ct_mb_release std::memory_order_release
- #define ct_mb_acq_rel std::memory_order_acq_rel
- #define ct_mb_seq_cst std::memory_order_seq_cst
- #define ct_mb_fence(mb_membar) std::atomic_thread_fence(mb_membar)
- namespace ct {
- namespace proxy {
- // User object base
- struct object
- {
- virtual ~object() {}
- };
- // Proxy node
- struct node
- {
- std::atomic<std::intptr_t> count;
- node* next;
- object* obj;
- node(node const& n)
- : count(n.count.load(ct_mb_relaxed)), next(n.next), obj(n.obj)
- {
- }
- node& operator =(node const& n)
- {
- count.store(n.count.load(ct_mb_relaxed));
- next = n.next;
- obj = n.obj;
- return *this;
- }
- node(std::intptr_t count_, node* next_, object* obj_)
- : count(count_), next(next_), obj(obj_)
- {
- g_allocs.fetch_add(1, ct_mb_relaxed);
- }
- ~node()
- {
- g_allocs.fetch_sub(1, ct_mb_relaxed);
- }
- };
- // DWCAS target
- struct anchor
- {
- std::intptr_t count;
- node* head;
- };
- // Proxy Collector
- struct gc
- {
- std::atomic<anchor> head;
- gc() : head(anchor { 0, new node(0, nullptr, nullptr) }) {}
- ~gc()
- {
- anchor cmp = head.load(ct_mb_relaxed);
- assert(cmp.count > -1);
- assert(cmp.head->next == nullptr);
- prv_dtor(cmp.head);
- }
- // Release a reference
- bool prv_release(node* n)
- {
- std::intptr_t count = n->count.fetch_sub(2, ct_mb_relaxed);
- if (count == 3) return true;
- return false;
- }
- // Destroy a node
- void prv_dtor(node* n)
- {
- object* obj = n->obj;
- if (obj != nullptr) delete obj;
- delete n;
- }
- // Dump backlinks
- void prv_dump(node* n)
- {
- node* cur = n->next;
- n->next = nullptr;
- // Release
- while (prv_release(cur))
- {
- ct_mb_fence(ct_mb_acquire);
- node* next = cur->next;
- cur->next = n;
- n = cur;
- cur = next;
- }
- // Destroy
- while (n)
- {
- node* next = n->next;
- prv_dtor(n);
- n = next;
- }
- }
- // Collect a node
- void prv_collect(node* n)
- {
- anchor xchg = { 0, n };
- ct_mb_fence(ct_mb_release);
- anchor cmp = head.exchange(xchg, ct_mb_relaxed);
- ct_mb_fence(ct_mb_acquire);
- cmp.head->next = xchg.head;
- ct_mb_fence(ct_mb_release);
- std::intptr_t count = cmp.head->count.fetch_add(cmp.count + 1, ct_mb_relaxed);
- if (count + cmp.count == 0)
- {
- prv_dump(cmp.head);
- }
- }
- // Acquire a node
- node* acquire()
- {
- anchor cmp = head.load(ct_mb_relaxed);
- anchor xchg = { cmp.count + 2, cmp.head };
- while (! head.compare_exchange_weak(cmp, xchg, ct_mb_relaxed))
- {
- xchg = { cmp.count + 2, cmp.head };
- }
- ct_mb_fence(ct_mb_acquire);
- return cmp.head;
- }
- // Release a node
- void release(node* n)
- {
- ct_mb_fence(ct_mb_release);
- if (prv_release(n))
- {
- ct_mb_fence(ct_mb_acquire);
- prv_dump(n);
- }
- }
- // Collect an object
- void collect(object* obj)
- {
- prv_collect(new node(2, nullptr, obj));
- }
- };
- }}
- // Test object
- struct foo : public ct::proxy::object
- {
- std::atomic<foo*> next;
- foo(foo* next_ = nullptr) : next(next_)
- {
- g_allocs.fetch_add(1, ct_mb_relaxed);
- }
- virtual ~foo()
- {
- g_allocs.fetch_sub(1, ct_mb_relaxed);
- }
- };
- // An atomic LIFO of test objects
- struct foo_lifo
- {
- std::atomic<foo*> head;
- foo_lifo(foo* next_ = nullptr) : head(next_) { }
- void push(foo* n)
- {
- foo* cmp = head.load(ct_mb_relaxed);
- do {
- n->next.store(cmp, ct_mb_relaxed);
- ct_mb_fence(ct_mb_release);
- } while (! head.compare_exchange_weak(cmp, n, ct_mb_relaxed));
- }
- // Flush all items, and return LIFO list or nullptr.
- foo* flush()
- {
- foo* cmp = head.exchange(nullptr, ct_mb_relaxed);
- if (cmp) ct_mb_fence(ct_mb_acquire);
- return cmp;
- }
- };
- /* Test program
- ____________________________________________ */
- #include <thread>
- #include <functional>
- #define READERS 7
- #define WRITERS 5
- #define ITERS 1000000
- #define THREADS (READERS + WRITERS)
- // Our shared state
- struct ct_test_state
- {
- ct::proxy::gc gc;
- foo_lifo lifo;
- ct_test_state() : gc(), lifo() {}
- };
- // Read the tstate.lifo shared data-structure.
- void ct_reader_thread(
- ct_test_state& tstate
- ) {
- ct_cout("ct_reader_thread: Enter\n");
- // Read...
- for (unsigned int i = 0; i < ITERS * 2; ++i)
- {
- // Acquire proxy
- ct::proxy::node* pcn = tstate.gc.acquire();
- // Iterate list
- {
- foo* cur = tstate.lifo.head.load(ct_mb_relaxed);
- ct_mb_fence(ct_mb_acquire);
- while (cur)
- {
- foo* next = cur->next.load(ct_mb_relaxed);
- std::this_thread::yield();
- cur = next;
- }
- }
- // Release proxy
- tstate.gc.release(pcn);
- if (! (i % (ITERS / 64)))
- {
- ct_cout("ct_reader_thread: Processing...\n");
- }
- }
- ct_cout("ct_reader_thread: Exit\n");
- }
- // Mutate the tstate.lifo shared data-structure.
- void ct_writer_thread(
- ct_test_state& tstate
- ) {
- ct_cout("ct_writer_thread: Enter\n");
- // Create and collect some nodes
- for (unsigned int i = 0; i < ITERS; ++i)
- {
- // Create
- tstate.lifo.push(new foo(nullptr));
- std::this_thread::yield();
- tstate.lifo.push(new foo(nullptr));
- std::this_thread::yield();
- tstate.lifo.push(new foo(nullptr));
- std::this_thread::yield();
- // Collect
- foo* cur = tstate.lifo.flush();
- while (cur)
- {
- std::this_thread::yield();
- foo* next = cur->next.load(ct_mb_relaxed);
- tstate.gc.collect(cur);
- std::this_thread::yield();
- cur = next;
- }
- if (!(i % (ITERS / 64)))
- {
- ct_cout("ct_writer_thread: Processing...\n");
- }
- }
- ct_cout("ct_writer_thread: Exit\n");
- }
- int main()
- {
- {
- ct_test_state tstate;
- std::thread threads[THREADS];
- // Reader threads
- for (unsigned int i = 0; i < READERS; ++i)
- {
- threads[i] = std::thread(ct_reader_thread, std::ref(tstate));
- }
- // Writer threads
- for (unsigned int i = READERS; i < THREADS; ++i)
- {
- threads[i] = std::thread(ct_writer_thread, std::ref(tstate));
- }
- // Join threads
- for (unsigned int i = 0; i < THREADS; ++i)
- {
- threads[i].join();
- }
- }
- // Check for memory leaks
- if (g_allocs.load(ct_mb_relaxed) != 0)
- {
- std::printf("MEMORY LEAK!\n");
- assert(false);
- }
- std::printf("\n\nFin!\n");
- std::fflush(stdout);
- std::getchar();
- return EXIT_SUCCESS;
- }
Advertisement
Add Comment
Please, Sign In to add comment