Chris_M_Thomasson

Simple Proxy Collector DWCAS - C++11

Mar 27th, 2018
304
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.65 KB | None | 0 0
  1. /* Simple Proxy Collector (DWCAS) - C++11 Version
  2.  
  3. http://www.1024cores.net/home/relacy-race-detector
  4.  
  5. Copyright 3/27/2018
  6. ___________________________________________________*/
  7.  
  8.  
  9. #if defined (_MSC_VER)
  10. #   define _ENABLE_ATOMIC_ALIGNMENT_FIX // for dwcas
  11. #endif
  12.  
  13. #include <atomic>
  14. #include <mutex>
  15.  
  16. #include <cstdio>
  17. #include <cstdlib>
  18. #include <cstdint>
  19. #include <cassert>
  20.  
  21.  
  22. static std::atomic<unsigned int> g_allocs(0);
  23.  
  24.  
  25. /* Utilities
  26. ___________________________________________________*/
  27. void ct_cout(
  28.     const char* msg
  29. ) {
  30.     static std::mutex g_cout_lock;
  31.     g_cout_lock.lock();
  32.     std::printf(msg);
  33.     std::fflush(stdout);
  34.     g_cout_lock.unlock();
  35. }
  36.  
  37.  
  38. /* Member Fence Abstraction
  39. ___________________________________________________*/
  40. #define ct_mb_relaxed std::memory_order_relaxed
  41. #define ct_mb_acquire std::memory_order_acquire
  42. #define ct_mb_release std::memory_order_release
  43. #define ct_mb_acq_rel std::memory_order_acq_rel
  44. #define ct_mb_seq_cst std::memory_order_seq_cst
  45. #define ct_mb_fence(mb_membar) std::atomic_thread_fence(mb_membar)
  46.  
  47.  
  48. namespace ct {
  49. namespace proxy {
  50.  
  51.     // User object base
  52.     struct object
  53.     {
  54.         virtual ~object() {}
  55.     };
  56.  
  57.  
  58.     // Proxy node
  59.     struct node
  60.     {
  61.         std::atomic<std::intptr_t> count;
  62.         node* next;
  63.         object* obj;
  64.  
  65.         node(node const& n)
  66.             : count(n.count.load(ct_mb_relaxed)), next(n.next), obj(n.obj)
  67.         {
  68.  
  69.         }
  70.  
  71.         node& operator =(node const& n)
  72.         {
  73.             count.store(n.count.load(ct_mb_relaxed));
  74.             next = n.next;
  75.             obj = n.obj;
  76.             return *this;
  77.         }
  78.  
  79.         node(std::intptr_t count_, node* next_, object* obj_)
  80.             : count(count_), next(next_), obj(obj_)
  81.         {
  82.             g_allocs.fetch_add(1, ct_mb_relaxed);
  83.         }
  84.  
  85.         ~node()
  86.         {
  87.             g_allocs.fetch_sub(1, ct_mb_relaxed);
  88.         }
  89.     };
  90.  
  91.  
  92.     // DWCAS target
  93.     struct anchor
  94.     {
  95.         std::intptr_t count;
  96.         node* head;
  97.     };
  98.  
  99.  
  100.     // Proxy Collector
  101.     struct gc
  102.     {
  103.         std::atomic<anchor> head;
  104.  
  105.         gc() : head(anchor { 0, new node(0, nullptr, nullptr) }) {}
  106.  
  107.         ~gc()
  108.         {
  109.             anchor cmp = head.load(ct_mb_relaxed);
  110.             assert(cmp.count > -1);
  111.             assert(cmp.head->next == nullptr);
  112.             prv_dtor(cmp.head);
  113.         }
  114.  
  115.         // Release a reference
  116.         bool prv_release(node* n)
  117.         {
  118.             std::intptr_t count = n->count.fetch_sub(2, ct_mb_relaxed);
  119.             if (count == 3) return true;
  120.             return false;
  121.         }
  122.  
  123.         // Destroy a node
  124.         void prv_dtor(node* n)
  125.         {
  126.             object* obj = n->obj;
  127.             if (obj != nullptr) delete obj;
  128.             delete n;
  129.         }
  130.  
  131.         // Dump backlinks
  132.         void prv_dump(node* n)
  133.         {
  134.             node* cur = n->next;
  135.             n->next = nullptr;
  136.  
  137.             // Release
  138.             while (prv_release(cur))
  139.             {
  140.                 ct_mb_fence(ct_mb_acquire);
  141.                 node* next = cur->next;
  142.                 cur->next = n;
  143.                 n = cur;
  144.                 cur = next;
  145.             }
  146.  
  147.             // Destroy
  148.             while (n)
  149.             {
  150.                 node* next = n->next;
  151.                 prv_dtor(n);
  152.                 n = next;
  153.             }
  154.         }
  155.  
  156.         // Collect a node
  157.         void prv_collect(node* n)
  158.         {
  159.             anchor xchg = { 0, n };
  160.  
  161.             ct_mb_fence(ct_mb_release);
  162.  
  163.             anchor cmp = head.exchange(xchg, ct_mb_relaxed);
  164.  
  165.             ct_mb_fence(ct_mb_acquire);
  166.             cmp.head->next = xchg.head;
  167.             ct_mb_fence(ct_mb_release);
  168.  
  169.             std::intptr_t count = cmp.head->count.fetch_add(cmp.count + 1, ct_mb_relaxed);
  170.  
  171.             if (count + cmp.count == 0)
  172.             {
  173.                 prv_dump(cmp.head);
  174.             }
  175.         }
  176.  
  177.         // Acquire a node
  178.         node* acquire()
  179.         {
  180.             anchor cmp = head.load(ct_mb_relaxed);
  181.             anchor xchg = { cmp.count + 2, cmp.head };
  182.  
  183.             while (! head.compare_exchange_weak(cmp, xchg, ct_mb_relaxed))
  184.             {
  185.                 xchg = { cmp.count + 2, cmp.head };
  186.             }
  187.  
  188.             ct_mb_fence(ct_mb_acquire);
  189.  
  190.             return cmp.head;
  191.         }
  192.  
  193.         // Release a node
  194.         void release(node* n)
  195.         {
  196.             ct_mb_fence(ct_mb_release);
  197.  
  198.             if (prv_release(n))
  199.             {
  200.                 ct_mb_fence(ct_mb_acquire);
  201.                 prv_dump(n);
  202.             }
  203.         }
  204.  
  205.         // Collect an object
  206.         void collect(object* obj)
  207.         {
  208.             prv_collect(new node(2, nullptr, obj));
  209.         }
  210.     };
  211. }}
  212.  
  213.  
  214. // Test object
  215. struct foo : public ct::proxy::object
  216. {
  217.     std::atomic<foo*> next;
  218.  
  219.     foo(foo* next_ = nullptr) : next(next_)
  220.     {
  221.         g_allocs.fetch_add(1, ct_mb_relaxed);
  222.     }
  223.  
  224.     virtual ~foo()
  225.     {
  226.         g_allocs.fetch_sub(1, ct_mb_relaxed);
  227.     }
  228. };
  229.  
  230. // An atomic LIFO of test objects
  231. struct foo_lifo
  232. {
  233.     std::atomic<foo*> head;
  234.  
  235.     foo_lifo(foo* next_ = nullptr) : head(next_) { }
  236.  
  237.     void push(foo* n)
  238.     {
  239.         foo* cmp = head.load(ct_mb_relaxed);
  240.  
  241.         do {
  242.             n->next.store(cmp, ct_mb_relaxed);
  243.             ct_mb_fence(ct_mb_release);
  244.         } while (! head.compare_exchange_weak(cmp, n, ct_mb_relaxed));
  245.     }
  246.  
  247.     // Flush all items, and return LIFO list or nullptr.
  248.     foo* flush()
  249.     {
  250.         foo* cmp = head.exchange(nullptr, ct_mb_relaxed);
  251.         if (cmp) ct_mb_fence(ct_mb_acquire);
  252.         return cmp;
  253.     }
  254. };
  255.  
  256.  
  257. /* Test program
  258. ____________________________________________ */
  259. #include <thread>
  260. #include <functional>
  261.  
  262. #define READERS 7
  263. #define WRITERS 5
  264. #define ITERS 1000000
  265. #define THREADS (READERS + WRITERS)
  266.  
  267.  
  268. // Our shared state
  269. struct ct_test_state
  270. {
  271.     ct::proxy::gc gc;
  272.     foo_lifo lifo;
  273.  
  274.     ct_test_state() : gc(), lifo() {}
  275. };
  276.  
  277.  
  278. // Read the tstate.lifo shared data-structure.
  279. void ct_reader_thread(
  280.     ct_test_state& tstate
  281. ) {
  282.     ct_cout("ct_reader_thread: Enter\n");
  283.  
  284.     // Read...
  285.     for (unsigned int i = 0; i < ITERS * 2; ++i)
  286.     {
  287.         // Acquire proxy
  288.         ct::proxy::node* pcn = tstate.gc.acquire();
  289.  
  290.         // Iterate list
  291.         {
  292.             foo* cur = tstate.lifo.head.load(ct_mb_relaxed);
  293.             ct_mb_fence(ct_mb_acquire);
  294.  
  295.             while (cur)
  296.             {
  297.                 foo* next = cur->next.load(ct_mb_relaxed);
  298.                 std::this_thread::yield();
  299.                 cur = next;
  300.             }
  301.         }
  302.  
  303.         // Release proxy
  304.         tstate.gc.release(pcn);
  305.  
  306.         if (! (i % (ITERS / 64)))
  307.         {
  308.             ct_cout("ct_reader_thread: Processing...\n");
  309.         }
  310.     }
  311.  
  312.     ct_cout("ct_reader_thread: Exit\n");
  313. }
  314.  
  315.  
  316. // Mutate the tstate.lifo shared data-structure.
  317. void ct_writer_thread(
  318.     ct_test_state& tstate
  319. ) {
  320.     ct_cout("ct_writer_thread: Enter\n");
  321.  
  322.     // Create and collect some nodes
  323.     for (unsigned int i = 0; i < ITERS; ++i)
  324.     {
  325.         // Create
  326.         tstate.lifo.push(new foo(nullptr));
  327.         std::this_thread::yield();
  328.         tstate.lifo.push(new foo(nullptr));
  329.         std::this_thread::yield();
  330.         tstate.lifo.push(new foo(nullptr));
  331.         std::this_thread::yield();
  332.  
  333.         // Collect
  334.         foo* cur = tstate.lifo.flush();
  335.  
  336.         while (cur)
  337.         {
  338.             std::this_thread::yield();
  339.             foo* next = cur->next.load(ct_mb_relaxed);
  340.             tstate.gc.collect(cur);
  341.             std::this_thread::yield();
  342.             cur = next;
  343.         }
  344.  
  345.         if (!(i % (ITERS / 64)))
  346.         {
  347.             ct_cout("ct_writer_thread: Processing...\n");
  348.         }
  349.     }
  350.  
  351.     ct_cout("ct_writer_thread: Exit\n");
  352. }
  353.  
  354.  
  355. int main()
  356. {
  357.     {
  358.         ct_test_state tstate;
  359.  
  360.         std::thread threads[THREADS];
  361.  
  362.         // Reader threads
  363.         for (unsigned int i = 0; i < READERS; ++i)
  364.         {
  365.             threads[i] = std::thread(ct_reader_thread, std::ref(tstate));
  366.         }
  367.  
  368.         // Writer threads
  369.         for (unsigned int i = READERS; i < THREADS; ++i)
  370.         {
  371.             threads[i] = std::thread(ct_writer_thread, std::ref(tstate));
  372.         }
  373.  
  374.         // Join threads
  375.         for (unsigned int i = 0; i < THREADS; ++i)
  376.         {
  377.             threads[i].join();
  378.         }
  379.     }
  380.  
  381.     // Check for memory leaks
  382.     if (g_allocs.load(ct_mb_relaxed) != 0)
  383.     {
  384.         std::printf("MEMORY LEAK!\n");
  385.         assert(false);
  386.     }
  387.  
  388.     std::printf("\n\nFin!\n");
  389.     std::fflush(stdout);
  390.     std::getchar();
  391.  
  392.     return EXIT_SUCCESS;
  393. }
Advertisement
Add Comment
Please, Sign In to add comment