Advertisement
Chris_M_Thomasson

Poor Mans RCU ver.0.0.1

Feb 7th, 2021
923
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.10 KB | None | 0 0
  1. // Chris M. Thomassons Poor Mans RCU... Example 123...
  2.  
  3.  
  4. #include <iostream>
  5. #include <atomic>
  6. #include <thread>
  7. #include <cstdlib>
  8. #include <cstdint>
  9. #include <climits>
  10. #include <functional>
  11.  
  12.  
  13. // Masks
  14. static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
  15. static constexpr std::uint32_t ct_ref_complete = 0x30U;
  16. static constexpr std::uint32_t ct_ref_inc = 0x20U;
  17. static constexpr std::uint32_t ct_proxy_mask = 0xFU;
  18. static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
  19.  
  20.  
  21. // Iteration settings
  22. static constexpr unsigned long ct_reader_iters_n = 1000000;
  23. static constexpr unsigned long ct_writer_iters_n = 100000;
  24.  
  25.  
  26. // Thread counts
  27. static constexpr unsigned long ct_reader_threads_n = 42;
  28. static constexpr unsigned long ct_writer_threads_n = 7;
  29.  
  30.  
  31. // Some debug/sanity check things...
  32. // Need to make this conditional in compilation with some macros...
  33. static std::atomic<std::uint32_t> g_debug_node_allocations(0);
  34. static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
  35.  
  36.  
  37. // Need to align and pad data structures! To do...
  38.  
  39. struct ct_node
  40. {
  41.     std::atomic<ct_node*> m_next;
  42.     ct_node* m_defer_next;
  43.  
  44.     ct_node() : m_next(nullptr), m_defer_next(nullptr)
  45.     {
  46.         g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
  47.     }
  48.  
  49.     ~ct_node()
  50.     {
  51.         g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
  52.     }
  53. };
  54.  
  55.  
  56. // The proxy collector itself... :^)
  57. template<std::size_t T_defer_limit>
  58. class ct_proxy
  59. {
  60.     static void prv_destroy(ct_node* n)
  61.     {
  62.         while (n)
  63.         {
  64.             ct_node* next = n->m_defer_next;
  65.             delete n;
  66.             n = next;
  67.         }
  68.     }
  69.  
  70.  
  71. public:
  72.     class collector
  73.     {
  74.         friend class ct_proxy;
  75.  
  76.     private:
  77.         std::atomic<ct_node*> m_defer;
  78.         std::atomic<std::uint32_t> m_defer_count;
  79.         std::atomic<std::uint32_t> m_count;
  80.  
  81.  
  82.  
  83.     public:
  84.         collector()
  85.             : m_defer(nullptr),
  86.             m_defer_count(0),
  87.             m_count(0)
  88.         {
  89.  
  90.         }
  91.  
  92.  
  93.         ~collector()
  94.         {
  95.             prv_destroy(m_defer.load(std::memory_order_relaxed));
  96.         }
  97.     };
  98.  
  99.  
  100. private:
  101.     std::atomic<std::uint32_t> m_current;
  102.     std::atomic<bool> m_quiesce;
  103.     ct_node* m_defer;
  104.     collector m_collectors[2];
  105.  
  106.  
  107. public:
  108.     ct_proxy()
  109.         : m_current(0),
  110.         m_quiesce(false),
  111.         m_defer(nullptr)
  112.     {
  113.  
  114.     }
  115.  
  116.     ~ct_proxy()
  117.     {
  118.         prv_destroy(m_defer);
  119.     }
  120.  
  121.  
  122. private:
  123.     void prv_quiesce_begin()
  124.     {
  125.         // Try to begin the quiescence process.
  126.         if (! m_quiesce.exchange(true, std::memory_order_acquire))
  127.         {
  128.             // advance the current collector and grab the old one.
  129.             std::uint32_t old = m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
  130.             old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
  131.             collector& c = m_collectors[old & ct_proxy_mask];
  132.  
  133.             // decode reference count.
  134.             std::uint32_t refs = old & ct_ref_mask;
  135.  
  136.             // increment and generate an odd reference count.
  137.             std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);
  138.  
  139.             if (old_refs == refs)
  140.             {
  141.                 // odd reference count and drop-to-zero condition detected!
  142.                 prv_quiesce_complete(c);
  143.             }
  144.         }
  145.     }
  146.  
  147.  
  148.     void prv_quiesce_complete(collector& c)
  149.     {
  150.         // the collector `c' is now in a quiescent state! :^)
  151.         std::atomic_thread_fence(std::memory_order_acquire);
  152.  
  153.         // maintain the back link and obtain "fresh" objects from
  154.         // this collection.
  155.         ct_node* n = m_defer;
  156.         m_defer = c.m_defer.load(std::memory_order_relaxed);
  157.         c.m_defer.store(0, std::memory_order_relaxed);
  158.  
  159.         // reset the reference count.
  160.         c.m_count.store(0, std::memory_order_relaxed);
  161.         c.m_defer_count.store(0, std::memory_order_relaxed);
  162.  
  163.         // release the quiesce lock.
  164.         m_quiesce.store(false, std::memory_order_release);
  165.  
  166.         // destroy nodes.
  167.         prv_destroy(n);
  168.     }
  169.  
  170.  
  171. public:
  172.     collector& acquire()
  173.     {
  174.         // increment the master count _and_ obtain current collector.
  175.         std::uint32_t current =
  176.             m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
  177.  
  178.         // decode the collector index.
  179.         return m_collectors[current & ct_proxy_mask];
  180.     }
  181.  
  182.     void release(collector& c)
  183.     {
  184.         // decrement the collector.
  185.         std::uint32_t count =
  186.             c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
  187.  
  188.         // check for the completion of the quiescence process.
  189.         if ((count & ct_ref_mask) == ct_ref_complete)
  190.         {
  191.             // odd reference count and drop-to-zero condition detected!
  192.             prv_quiesce_complete(c);
  193.         }
  194.     }
  195.  
  196.  
  197.     collector& sync(collector& c)
  198.     {
  199.         // check if the `c' is in the middle of a quiescence process.
  200.         if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
  201.         {
  202.             // drop `c' and get the next collector.
  203.             release(c);
  204.  
  205.             return acquire();
  206.         }
  207.  
  208.         return c;
  209.     }
  210.  
  211.  
  212.     void collect()
  213.     {
  214.         prv_quiesce_begin();
  215.     }
  216.  
  217.  
  218.     void collect(collector& c, ct_node* n)
  219.     {
  220.         if (! n) return;
  221.  
  222.         // link node into the defer list.
  223.         ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
  224.         n->m_defer_next = prev;
  225.  
  226.         // bump the defer count and begin quiescence process if over
  227.         // the limit.
  228.         std::uint32_t count =
  229.             c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
  230.  
  231.         if (count >= (T_defer_limit / 2))
  232.         {
  233.             prv_quiesce_begin();
  234.         }
  235.     }
  236. };
  237.  
  238.  
  239.  
  240. typedef ct_proxy<10> ct_proxy_collector;
  241.  
  242.  
  243. // you're basic lock-free stack...
  244. // well, minus ABA counter and DWCAS of course!  ;^)
  245. class ct_stack
  246. {
  247.     std::atomic<ct_node*> m_head;
  248.  
  249.  
  250. public:
  251.     ct_stack() : m_head(nullptr)
  252.     {
  253.  
  254.     }
  255.  
  256.  
  257. public:
  258.     void push(ct_node* n)
  259.     {
  260.         ct_node* head = m_head.load(std::memory_order_relaxed);
  261.  
  262.         do
  263.         {
  264.             n->m_next.store(head, std::memory_order_relaxed);
  265.         }
  266.  
  267.         while (! m_head.compare_exchange_weak(
  268.             head,
  269.             n,
  270.             std::memory_order_release));
  271.     }
  272.  
  273.  
  274.     ct_node* flush()
  275.     {
  276.         return m_head.exchange(nullptr, std::memory_order_acquire);
  277.     }
  278.  
  279.  
  280.     ct_node* get_head()
  281.     {
  282.         return m_head.load(std::memory_order_acquire);
  283.     }
  284.  
  285.  
  286.     ct_node* pop()
  287.     {
  288.         ct_node* head = m_head.load(std::memory_order_acquire);
  289.         ct_node* xchg;
  290.  
  291.         do
  292.         {
  293.             if (!head) return nullptr;
  294.  
  295.             xchg = head->m_next.load(std::memory_order_relaxed);
  296.         }
  297.  
  298.         while (! m_head.compare_exchange_weak(
  299.             head,
  300.             xchg,
  301.             std::memory_order_acquire));
  302.  
  303.         return head;
  304.     }
  305. };
  306.  
  307.  
  308. // The shared state
  309. struct ct_shared
  310. {
  311.     ct_proxy<10> m_proxy_gc;
  312.     ct_stack m_stack;
  313. };
  314.  
  315.  
  316.  
  317. // Reader threads
  318. // Iterates through the lock free stack
  319. void ct_thread_reader(ct_shared& shared)
  320. {
  321.     // iterate the lockfree stack
  322.     for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
  323.     {
  324.         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
  325.  
  326.         ct_node* n = shared.m_stack.get_head();
  327.  
  328.         while (n)
  329.         {
  330.             // need to add in some processing...
  331.             //std::this_thread::yield();
  332.  
  333.             n = n->m_next.load(std::memory_order_relaxed);
  334.         }
  335.  
  336.         shared.m_proxy_gc.release(c);
  337.     }
  338. }
  339.  
  340.  
  341.  
  342. // Writer threads
  343. // Mutates the lock free stack
  344. void ct_thread_writer(ct_shared& shared)
  345. {
  346.     for (unsigned long wloop = 0; wloop < 42; ++wloop)
  347.     {
  348.         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
  349.         {
  350.             shared.m_stack.push(new ct_node());
  351.         }
  352.  
  353.         std::this_thread::yield();
  354.  
  355.         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
  356.  
  357.         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
  358.         {
  359.             shared.m_proxy_gc.collect(c, shared.m_stack.pop());
  360.         }
  361.  
  362.         shared.m_proxy_gc.release(c);
  363.  
  364.         std::this_thread::yield();
  365.  
  366.         if ((wloop % 3) == 0)
  367.         {
  368.             shared.m_proxy_gc.collect();
  369.         }
  370.     }
  371. }
  372.  
  373.  
  374.  
  375. int main()
  376. {
  377.     std::cout << "Chris M. Thomassons Proxy Collector Port ver .0.0.1...\n";
  378.     std::cout << "_______________________________________\n\n";
  379.  
  380.  
  381.     {
  382.         ct_shared shared;
  383.  
  384.         std::thread readers[ct_reader_threads_n];
  385.         std::thread writers[ct_writer_threads_n];
  386.  
  387.         std::cout << "Booting threads...\n";
  388.  
  389.         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
  390.         {
  391.             writers[i] = std::thread(ct_thread_writer, std::ref(shared));
  392.         }
  393.  
  394.         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
  395.         {
  396.             readers[i] = std::thread(ct_thread_reader, std::ref(shared));
  397.         }
  398.  
  399.         std::cout << "Threads running...\n";
  400.  
  401.         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
  402.         {
  403.             readers[i].join();
  404.         }
  405.  
  406.         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
  407.         {
  408.             writers[i].join();
  409.         }
  410.     }
  411.  
  412.     std::cout << "Threads completed!\n\n";
  413.  
  414.  
  415.     // Sanity check!
  416.     {
  417.         std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
  418.         std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
  419.  
  420.         std::cout << "node_allocations = " << node_allocations << "\n";
  421.         std::cout << "node_deallocations = " << node_deallocations << "\n";
  422.  
  423.         if (node_allocations != node_deallocations)
  424.         {
  425.             std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " << node_allocations - node_deallocations << "\n\n";
  426.         }
  427.  
  428.     }
  429.  
  430.     std::cout << "\n\nTest Completed!\n\n";
  431.  
  432.     return 0;
  433. }
  434.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement