Advertisement
Chris_M_Thomasson

Simple XCHG based Atomic Stack

Jan 4th, 2019
223
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.90 KB | None | 0 0
  1. // Simple XCHG based Atomic Stack
  2. // By: Chris M. Thomasson
  3.  
  4.  
  5. #include <iostream>
  6. #include <atomic>
  7. #include <mutex>
  8. #include <thread>
  9. #include <functional>
  10. #include <cassert>
  11.  
  12.  
  13. // sorry about the macros...
  14. #define THREADS 42
  15. #define ITERS 100000
  16.  
  17.  
  18. #define CT_MB_RLX std::memory_order_relaxed
  19. #define CT_MB_ACQ std::memory_order_acquire
  20. #define CT_MB_REL std::memory_order_release
  21.  
  22.  
  23. // HACK! Humm...
  24. #define CT_WAIT ((ct_work*)(0xDEADBEEFU))
  25.  
  26.  
  27.  
  28. // Just to track all the dynamic allocations...
  29. static std::atomic<unsigned long> g_allocs(0);
  30. static std::mutex g_cout_mtx;
  31.  
  32.  
  33. // A work node
  34. struct ct_work
  35. {
  36. std::atomic<ct_work*> m_next;
  37. std::thread::id m_data;
  38. ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {}
  39.  
  40.  
  41. void process()
  42. {
  43. // Simulate just a tiny little work?
  44. g_cout_mtx.lock();
  45. std::this_thread::yield();
  46. std::this_thread::yield();
  47. std::this_thread::yield();
  48.  
  49. std::thread::id local = std::this_thread::get_id();
  50.  
  51. if (m_data == local)
  52. {
  53. // std::cout << "processing local = " << m_data <<
  54. // " from " << std::this_thread::get_id() << "\n";
  55. }
  56.  
  57. else
  58. {
  59. std::cout << "processing foreign = " << m_data <<
  60. " from " << std::this_thread::get_id() << "\n";
  61. }
  62.  
  63. std::this_thread::yield();
  64. std::this_thread::yield();
  65. std::this_thread::yield();
  66. g_cout_mtx.unlock();
  67. }
  68.  
  69.  
  70. ct_work* get_next() const
  71. {
  72. ct_work* w = nullptr;
  73.  
  74. while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT)
  75. {
  76. // we can spin, or even do other work right here...
  77. std::this_thread::yield();
  78. }
  79.  
  80. return w;
  81. }
  82. };
  83.  
  84.  
  85.  
  86. // Easy Stack, only uses XCHG
  87. struct ct_estack
  88. {
  89. std::atomic<ct_work*> m_head;
  90. ct_estack() : m_head(nullptr) {}
  91.  
  92.  
  93. void push(ct_work* n)
  94. {
  95. n->m_next.store(CT_WAIT, CT_MB_RLX);
  96. ct_work* head = m_head.exchange(n, CT_MB_REL); // release
  97. n->m_next.store(head, CT_MB_RLX);
  98. }
  99.  
  100.  
  101. ct_work* flush_try()
  102. {
  103. return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
  104. }
  105. };
  106.  
  107.  
  108.  
  109. // Consume an Easy Stack...
  110. void ct_consume(
  111. ct_estack& estack
  112. ) {
  113. ct_work* w = estack.flush_try();
  114.  
  115. while (w)
  116. {
  117. // Process FIRST!
  118. w->process();
  119.  
  120. // Now, we can gain the next pointer.
  121. ct_work* next = w->get_next();
  122.  
  123. // Okay, we can delete the work
  124. delete w;
  125. g_allocs.fetch_sub(1, CT_MB_RLX); // dec
  126.  
  127. w = next;
  128. }
  129. }
  130.  
  131.  
  132.  
  133. // Our shared state
  134. struct ct_shared
  135. {
  136. ct_estack m_estack;
  137. };
  138.  
  139.  
  140.  
  141. // Produce some work...
  142. void ct_produce(
  143. ct_estack& estack
  144. ) {
  145. ct_work* w = new ct_work(std::this_thread::get_id());
  146. g_allocs.fetch_add(1, CT_MB_RLX); // inc
  147. estack.push(w);
  148. }
  149.  
  150.  
  151. // Do some work...
  152. void ct_worker(ct_shared& shared)
  153. {
  154. for (unsigned int i = 0; i < ITERS; ++i)
  155. {
  156. ct_produce(shared.m_estack);
  157. ct_produce(shared.m_estack);
  158. ct_produce(shared.m_estack);
  159.  
  160. std::this_thread::yield(); // little spice...
  161.  
  162. ct_consume(shared.m_estack);
  163. }
  164.  
  165. ct_consume(shared.m_estack);
  166. }
  167.  
  168.  
  169.  
  170. int main(void)
  171. {
  172. {
  173. ct_shared shared;
  174. std::thread threads[THREADS];
  175.  
  176. for (unsigned long i = 0; i < THREADS; ++i)
  177. {
  178. threads[i] = std::thread(ct_worker, std::ref(shared));
  179. }
  180.  
  181. for (unsigned long i = 0; i < THREADS; ++i)
  182. {
  183. threads[i].join();
  184. }
  185. }
  186.  
  187. if (g_allocs.load(CT_MB_RLX) != 0)
  188. {
  189. std::cout << "\n\nLEAKED!!!!\n";
  190. }
  191.  
  192. std::cout << "\n\nFIN!\n";
  193.  
  194. return 0;
  195. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement