/* Simple Proxy Collector (DWCAS) - Relacy Version http://www.1024cores.net/home/relacy-race-detector Copyright 3/27/2018 ___________________________________________________*/ #include #include #include /* Membar Abstraction ___________________________________________________*/ #define ct_mb_relaxed std::memory_order_relaxed #define ct_mb_acquire std::memory_order_acquire #define ct_mb_release std::memory_order_release #define ct_mb_seq_cst std::memory_order_seq_cst #define ct_mb_fence(mb_membar) std::atomic_thread_fence(mb_membar) /* Simple Proxy Collector C++11 ___________________________________________________*/ namespace ct { namespace proxy { // User object base struct object { virtual ~object() {} }; // Proxy node struct node { std::atomic count; VAR_T(node*) next; VAR_T(object*) obj; node(std::intptr_t count_, node* next_, object* obj_) : count(count_), next(next_), obj(obj_) { } ~node() { } }; // DWCAS target struct anchor { std::intptr_t count; node* head; anchor() { } anchor(std::intptr_t count_, node* head_) { count = count_; head = head_; } bool operator == (anchor const& right) const { return count == right.count && head == right.head; } anchor operator + (anchor const& right) const { anchor res; res.count = count + right.count; res.head = right.head; return res; } anchor operator - (anchor const& right) const { anchor res; res.count = count - right.count; res.head = right.head; return res; } }; std::ostream& operator << (std::ostream& s, anchor const& right) { return s << "{" << right.count << "," << right.head << "}"; } // Collector Logic struct gc { std::atomic head; gc() : head(anchor{ 0, new node(0, nullptr, nullptr) }) {} ~gc() { anchor cmp = head.load(ct_mb_relaxed); RL_ASSERT(cmp.count > -1); RL_ASSERT(VAR(cmp.head->next) == nullptr); prv_dtor(cmp.head); } // Drops a reference count on a node bool prv_release(node* n) { std::intptr_t count = n->count.fetch_sub(2, ct_mb_relaxed); if (count == 3) return true; return false; } // Deletes a node void prv_dtor(node* n) { object* obj = VAR(n->obj); if (obj != nullptr) delete obj; delete n; } // Dumps backlinked nodes void prv_dump(node* n) { node* cur = VAR(n->next); VAR(n->next) = nullptr; // Release and collect in cur LIFO while (prv_release(cur)) { ct_mb_fence(ct_mb_acquire); node* next = VAR(cur->next); VAR(cur->next) = n; n = cur; cur = next; } // Destroy cur LIFO while (n) { node* next = VAR(n->next); prv_dtor(n); n = next; } } // Collects a node void prv_collect(node* n) { anchor xchg = { 0, n }; ct_mb_fence(ct_mb_release); // Swap in new node anchor cmp = head.exchange(xchg, ct_mb_relaxed); // Backlink ct_mb_fence(ct_mb_acquire); VAR(cmp.head->next) = xchg.head; ct_mb_fence(ct_mb_release); // Release and mark as collected std::intptr_t count = cmp.head->count.fetch_add(cmp.count + 1, ct_mb_relaxed); if (count + cmp.count == 0) { prv_dump(cmp.head); } } // Acquires current node node* acquire() { anchor cmp = head.load(ct_mb_relaxed); anchor xchg = { cmp.count + 2, cmp.head }; while (! head.compare_exchange_weak(cmp, xchg, ct_mb_relaxed)) { xchg = { cmp.count + 2, cmp.head }; } ct_mb_fence(ct_mb_acquire); return cmp.head; } // Release a node void release(node* n) { ct_mb_fence(ct_mb_release); if (prv_release(n)) { ct_mb_fence(ct_mb_acquire); prv_dump(n); } } // Collects an object void collect(object* obj) { prv_collect(new node(2, nullptr, obj)); } }; }} // Test object struct foo : public ct::proxy::object { std::atomic next; foo(foo* next_ = nullptr) : next(next_) { } virtual ~foo() { //std::cout << "foo::~foo()\n"; } }; // An atomic LIFO of test objects struct foo_lifo { std::atomic head; foo_lifo(foo* next_ = nullptr) : head(next_) { } void push(foo* n) { foo* cmp = head.load(ct_mb_relaxed); do { n->next.store(cmp, ct_mb_relaxed); ct_mb_fence(ct_mb_release); } while (! head.compare_exchange_weak(cmp, n, ct_mb_relaxed)); } foo* flush() { foo* cmp = head.exchange(nullptr, ct_mb_relaxed); if (cmp) ct_mb_fence(ct_mb_acquire); return cmp; } }; // Program Logic #define READERS 5 #define WRITERS 3 #define ITERS 5 #define THREADS (READERS + WRITERS) struct ct_proxy_test : rl::test_suite { ct::proxy::gc g_gc; foo_lifo g_lifo; void before() { //std::printf("before()\n"); } void after() { //std::printf("after()\n"); } // Reader threads... void thread_reader(unsigned int tidx) { //std::printf("thread_reader::%u\n", tidx); ct::proxy::node* pcn = g_gc.acquire(); { foo* cur = g_lifo.head.load(ct_mb_relaxed); ct_mb_fence(ct_mb_acquire); while (cur) { foo* next = cur->next.load(ct_mb_relaxed); rl::yield(1, $); cur = next; } } g_gc.release(pcn); } // Writer threads... void thread_writer(unsigned int tidx) { //std::printf("thread_writer::%u\n", tidx); for (unsigned int i = 0; i < ITERS; ++i) { // Add some nodes, and peform some collections... g_lifo.push(new foo()); g_lifo.push(new foo()); if (! (i % 2)) g_gc.collect(nullptr); rl::yield(1, $); g_lifo.push(new foo()); if (! (i % 3)) g_lifo.push(new foo()); // Flush foo* cur = g_lifo.flush(); // Collect the foo's... ;^) while (cur) { foo* next = cur->next.load(ct_mb_relaxed); rl::yield(1, $); g_gc.collect(cur); cur = next; } } } // Entry... void thread(unsigned int tidx) { if (tidx < READERS) { thread_reader(tidx); } else { thread_writer(tidx); } } }; int main() { { rl::test_params p; p.iteration_count = 12345600; // for existing proxy gc code //p.execution_depth_limit = 3333; //p.search_type = rl::random_scheduler_type; //p.search_type = rl::sched_bound; //p.search_type = rl::fair_full_search_scheduler_type; //p.search_type = rl::fair_context_bound_scheduler_type; rl::simulate(p); } std::puts("\nTest Complete!\n"); std::fflush(stdout); std::getchar(); return 0; }