Chris_M_Thomasson

Poor Mans RCU ver.0.0.2

Feb 9th, 2021
740
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 12.27 KB | None | 0 0
  1. // Chris M. Thomassons Poor Mans RCU... Example 456...
  2.  
  3.  
  4. #include <iostream>
  5. #include <atomic>
  6. #include <thread>
  7. #include <cstdlib>
  8. #include <cstdint>
  9. #include <climits>
  10. #include <functional>
  11.  
  12.  
  13. // Masks
  14. static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
  15. static constexpr std::uint32_t ct_ref_complete = 0x30U;
  16. static constexpr std::uint32_t ct_ref_inc = 0x20U;
  17. static constexpr std::uint32_t ct_proxy_mask = 0xFU;
  18. static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
  19.  
  20.  
  21. // Iteration settings
  22. static constexpr unsigned long ct_reader_iters_n = 2000000;
  23. static constexpr unsigned long ct_writer_iters_n = 200000;
  24.  
  25.  
  26. // Thread counts
  27. static constexpr unsigned long ct_reader_threads_n = 53;
  28. static constexpr unsigned long ct_writer_threads_n = 11;
  29.  
  30.  
  31. // Some debug/sanity check things...
  32. // Need to make this conditional in compilation with some macros...
  33. static std::atomic<std::uint32_t> g_debug_node_allocations(0);
  34. static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
  35. static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
  36. static std::atomic<std::uint32_t> g_debug_release_collect(0);
  37. static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
  38. static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
  39. static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);
  40.  
  41. // Need to align and pad data structures! To do...
  42.  
  43. struct ct_node
  44. {
  45.     std::atomic<ct_node*> m_next;
  46.     ct_node* m_defer_next;
  47.  
  48.     ct_node() : m_next(nullptr), m_defer_next(nullptr)
  49.     {
  50.         g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
  51.     }
  52.  
  53.     ~ct_node()
  54.     {
  55.         g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
  56.     }
  57. };
  58.  
  59.  
  60. // The proxy collector itself... :^)
  61. template<std::size_t T_defer_limit>
  62. class ct_proxy
  63. {
  64.     static std::uint32_t prv_destroy(ct_node* n)
  65.     {
  66.         std::uint32_t count = 0;
  67.  
  68.         while (n)
  69.         {
  70.             ct_node* next = n->m_defer_next;
  71.             delete n;
  72.             count++;
  73.             n = next;
  74.         }
  75.  
  76.         return count;
  77.     }
  78.  
  79.  
  80. public:
  81.     class collector
  82.     {
  83.         friend class ct_proxy;
  84.  
  85.     private:
  86.         std::atomic<ct_node*> m_defer;
  87.         std::atomic<std::uint32_t> m_defer_count;
  88.         std::atomic<std::uint32_t> m_count;
  89.  
  90.  
  91.  
  92.     public:
  93.         collector()
  94.             : m_defer(nullptr),
  95.             m_defer_count(0),
  96.             m_count(0)
  97.         {
  98.  
  99.         }
  100.  
  101.  
  102.         ~collector()
  103.         {
  104.             prv_destroy(m_defer.load(std::memory_order_relaxed));
  105.         }
  106.     };
  107.  
  108.  
  109. private:
  110.     std::atomic<std::uint32_t> m_current;
  111.     std::atomic<bool> m_quiesce;
  112.     ct_node* m_defer;
  113.     collector m_collectors[2];
  114.  
  115.  
  116. public:
  117.     ct_proxy()
  118.         : m_current(0),
  119.         m_quiesce(false),
  120.         m_defer(nullptr)
  121.     {
  122.  
  123.     }
  124.  
  125.     ~ct_proxy()
  126.     {
  127.         prv_destroy(m_defer);
  128.     }
  129.  
  130.  
  131. private:
  132.     void prv_quiesce_begin()
  133.     {
  134.         // Try to begin the quiescence process.
  135.         if (! m_quiesce.exchange(true, std::memory_order_acquire))
  136.         {
  137.             g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);
  138.  
  139.             // advance the current collector and grab the old one.
  140.             std::uint32_t old = m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
  141.             old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
  142.             collector& c = m_collectors[old & ct_proxy_mask];
  143.  
  144.             // decode reference count.
  145.             std::uint32_t refs = old & ct_ref_mask;
  146.  
  147.             // increment and generate an odd reference count.
  148.             std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);
  149.  
  150.             if (old_refs == 0 - refs)
  151.             {
  152.                 g_debug_dtor_collect.fetch_add(1, std::memory_order_relaxed);
  153.  
  154.                 // odd reference count and drop-to-zero condition detected!
  155.                 prv_quiesce_complete(c);
  156.             }
  157.         }
  158.     }
  159.  
  160.  
  161.     void prv_quiesce_complete(collector& c)
  162.     {
  163.         g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);
  164.  
  165.         // the collector `c' is now in a quiescent state! :^)
  166.         std::atomic_thread_fence(std::memory_order_acquire);
  167.  
  168.         // maintain the back link and obtain "fresh" objects from
  169.         // this collection.
  170.         ct_node* n = m_defer;
  171.         m_defer = c.m_defer.load(std::memory_order_relaxed);
  172.         c.m_defer.store(0, std::memory_order_relaxed);
  173.  
  174.         // reset the reference count.
  175.         c.m_count.store(0, std::memory_order_relaxed);
  176.         c.m_defer_count.store(0, std::memory_order_relaxed);
  177.  
  178.         // release the quiesce lock.
  179.         m_quiesce.store(false, std::memory_order_release);
  180.  
  181.         // destroy nodes.
  182.         std::uint32_t count = prv_destroy(n);
  183.  
  184.         g_debug_quiesce_complete_nodes.fetch_add(count, std::memory_order_relaxed);
  185.     }
  186.  
  187.  
  188. public:
  189.     collector& acquire()
  190.     {
  191.         // increment the master count _and_ obtain current collector.
  192.         std::uint32_t current =
  193.             m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
  194.  
  195.         // decode the collector index.
  196.         return m_collectors[current & ct_proxy_mask];
  197.     }
  198.  
  199.     void release(collector& c)
  200.     {
  201.         // decrement the collector.
  202.         std::uint32_t count =
  203.             c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
  204.  
  205.         // check for the completion of the quiescence process.
  206.         if ((count & ct_ref_mask) == ct_ref_complete)
  207.         {
  208.             // odd reference count and drop-to-zero condition detected!
  209.             g_debug_release_collect.fetch_add(1, std::memory_order_relaxed);
  210.  
  211.             prv_quiesce_complete(c);
  212.         }
  213.     }
  214.  
  215.  
  216.     collector& sync(collector& c)
  217.     {
  218.         // check if the `c' is in the middle of a quiescence process.
  219.         if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
  220.         {
  221.             // drop `c' and get the next collector.
  222.             release(c);
  223.  
  224.             return acquire();
  225.         }
  226.  
  227.         return c;
  228.     }
  229.  
  230.  
  231.     void collect()
  232.     {
  233.         prv_quiesce_begin();
  234.     }
  235.  
  236.  
  237.     void collect(collector& c, ct_node* n)
  238.     {
  239.         if (! n) return;
  240.  
  241.         // link node into the defer list.
  242.         ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
  243.         n->m_defer_next = prev;
  244.  
  245.         // bump the defer count and begin quiescence process if over
  246.         // the limit.
  247.         std::uint32_t count =
  248.             c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
  249.  
  250.         if (count >= (T_defer_limit / 2))
  251.         {
  252.             prv_quiesce_begin();
  253.         }
  254.     }
  255. };
  256.  
  257.  
  258.  
  259. typedef ct_proxy<10> ct_proxy_collector;
  260.  
  261.  
  262. // you're basic lock-free stack...
  263. // well, minus ABA counter and DWCAS of course!  ;^)
  264. class ct_stack
  265. {
  266.     std::atomic<ct_node*> m_head;
  267.  
  268.  
  269. public:
  270.     ct_stack() : m_head(nullptr)
  271.     {
  272.  
  273.     }
  274.  
  275.  
  276. public:
  277.     void push(ct_node* n)
  278.     {
  279.         ct_node* head = m_head.load(std::memory_order_relaxed);
  280.  
  281.         do
  282.         {
  283.             n->m_next.store(head, std::memory_order_relaxed);
  284.         }
  285.  
  286.         while (! m_head.compare_exchange_weak(
  287.             head,
  288.             n,
  289.             std::memory_order_release));
  290.     }
  291.  
  292.  
  293.     ct_node* flush()
  294.     {
  295.         return m_head.exchange(nullptr, std::memory_order_acquire);
  296.     }
  297.  
  298.  
  299.     ct_node* get_head()
  300.     {
  301.         return m_head.load(std::memory_order_acquire);
  302.     }
  303.  
  304.  
  305.     ct_node* pop()
  306.     {
  307.         ct_node* head = m_head.load(std::memory_order_acquire);
  308.         ct_node* xchg;
  309.  
  310.         do
  311.         {
  312.             if (! head) return nullptr;
  313.  
  314.             xchg = head->m_next.load(std::memory_order_relaxed);
  315.         }
  316.  
  317.         while (!m_head.compare_exchange_weak(
  318.             head,
  319.             xchg,
  320.             std::memory_order_acquire));
  321.  
  322.         return head;
  323.     }
  324. };
  325.  
  326.  
  327. // The shared state
  328. struct ct_shared
  329. {
  330.     ct_proxy<10> m_proxy_gc;
  331.     ct_stack m_stack;
  332. };
  333.  
  334.  
  335.  
  336. // Reader threads
  337. // Iterates through the lock free stack
  338. void ct_thread_reader(ct_shared& shared)
  339. {
  340.     // iterate the lockfree stack
  341.     for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
  342.     {
  343.         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
  344.  
  345.         ct_node* n = shared.m_stack.get_head();
  346.  
  347.         while (n)
  348.         {
  349.             // need to add in some processing...
  350.             // std::this_thread::yield();
  351.  
  352.             n = n->m_next.load(std::memory_order_relaxed);
  353.         }
  354.  
  355.         shared.m_proxy_gc.release(c);
  356.     }
  357. }
  358.  
  359.  
  360.  
  361. // Writer threads
  362. // Mutates the lock free stack
  363. void ct_thread_writer(ct_shared& shared)
  364. {
  365.     for (unsigned long wloop = 0; wloop < 42; ++wloop)
  366.     {
  367.         shared.m_proxy_gc.collect();
  368.  
  369.         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
  370.         {
  371.             shared.m_stack.push(new ct_node());
  372.         }
  373.  
  374.         //std::this_thread::yield();
  375.  
  376.         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
  377.  
  378.         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
  379.         {
  380.             shared.m_proxy_gc.collect(c, shared.m_stack.pop());
  381.         }
  382.  
  383.         shared.m_proxy_gc.release(c);
  384.  
  385.         for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
  386.         {
  387.             shared.m_proxy_gc.collect();
  388.         }
  389.  
  390.         {
  391.             ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
  392.  
  393.             for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
  394.             {
  395.                 ct_node* n = shared.m_stack.pop();
  396.                 if (! n) break;
  397.  
  398.                 shared.m_proxy_gc.collect(c, n);
  399.             }
  400.  
  401.             shared.m_proxy_gc.release(c);
  402.         }
  403.  
  404.         if ((wloop % 3) == 0)
  405.         {
  406.             shared.m_proxy_gc.collect();
  407.         }
  408.     }
  409. }
  410.  
  411.  
  412.  
  413. int main()
  414. {
  415.     std::cout << "Chris M. Thomassons Proxy Collector Port ver .0.0.2...\n";
  416.     std::cout << "_______________________________________\n\n";
  417.  
  418.     {
  419.         ct_shared shared;
  420.  
  421.         std::thread readers[ct_reader_threads_n];
  422.         std::thread writers[ct_writer_threads_n];
  423.  
  424.         std::cout << "Booting threads...\n";
  425.  
  426.         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
  427.         {
  428.             writers[i] = std::thread(ct_thread_writer, std::ref(shared));
  429.         }
  430.  
  431.         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
  432.         {
  433.             readers[i] = std::thread(ct_thread_reader, std::ref(shared));
  434.         }
  435.  
  436.         std::cout << "Threads running...\n";
  437.  
  438.         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
  439.         {
  440.             readers[i].join();
  441.         }
  442.  
  443.         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
  444.         {
  445.             writers[i].join();
  446.         }
  447.     }
  448.  
  449.     std::cout << "Threads completed!\n\n";
  450.  
  451.  
  452.     // Sanity check!
  453.     {
  454.         std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
  455.         std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
  456.         std::uint32_t dtor_collect = g_debug_dtor_collect.load(std::memory_order_relaxed);
  457.         std::uint32_t release_collect = g_debug_release_collect.load(std::memory_order_relaxed);
  458.         std::uint32_t quiesce_complete = g_debug_quiesce_complete.load(std::memory_order_relaxed);
  459.         std::uint32_t quiesce_begin = g_debug_quiesce_begin.load(std::memory_order_relaxed);
  460.         std::uint32_t quiesce_complete_nodes = g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);
  461.  
  462.         std::cout << "node_allocations = " << node_allocations << "\n";
  463.         std::cout << "node_deallocations = " << node_deallocations << "\n\n";
  464.         std::cout << "dtor_collect = " << dtor_collect << "\n";
  465.         std::cout << "release_collect = " << release_collect << "\n";
  466.         std::cout << "quiesce_complete = " << quiesce_complete << "\n";
  467.         std::cout << "quiesce_begin = " << quiesce_begin << "\n";
  468.         std::cout << "quiesce_complete_nodes = " << quiesce_complete_nodes << "\n";
  469.  
  470.         if (node_allocations != node_deallocations)
  471.         {
  472.             std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " << node_allocations - node_deallocations << "\n\n";
  473.         }
  474.  
  475.     }
  476.  
  477.     std::cout << "\n\nTest Completed!\n\n";
  478.  
  479.     return 0;
  480. }
  481.  
Advertisement
Add Comment
Please, Sign In to add comment