Advertisement
Guest User

Waitset for Non-Blocking Algorithm's by Chris Thomasson

a guest
Jun 9th, 2011
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.63 KB | None | 0 0
  1. /*
  2. Portable Waitset for Non-Blocking Algorithm's
  3. Copyright (C) 2010 Christopher Michael Thomasson
  4.  
  5. This program is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9.  
  10. This program is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. GNU General Public License for more details.
  14.  
  15. You should have received a copy of the GNU General Public License
  16. along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18.  
  19.  
  20.  
  21.  
  22.  
  23. //#define RL_DEBUGBREAK_ON_ASSERT
  24. //#define RL_MSVC_OUTPUT
  25. //#define RL_FORCE_SEQ_CST
  26. #define RL_GC
  27. #include <relacy/relacy_std.hpp>
  28. #include <cstdio>
  29. #include <cstddef>
  30. #include <climits>
  31.  
  32.  
  33.  
  34.  
  35. #if ! defined (NDEBUG)
  36. # define DBG_PRINTF(e) std::printf e
  37. #else
  38. # define DBG_PRINTF(e) ((void)0)
  39. #endif
  40.  
  41.  
  42.  
  43.  
  44. #define MEMBAR_SEQ_CST() std::atomic_thread_fence(std::memory_order_seq_cst)
  45. #define MEMBAR_CONSUME() std::atomic_thread_fence(std::memory_order_consume)
  46. #define MEMBAR_ACQUIRE() std::atomic_thread_fence(std::memory_order_acquire)
  47. #define MEMBAR_ACQ_REL() std::atomic_thread_fence(std::memory_order_acq_rel)
  48. #define MEMBAR_RELEASE() std::atomic_thread_fence(std::memory_order_release)
  49.  
  50.  
  51.  
  52.  
  53. class waitset
  54. {
  55. std::mutex m_mutex;
  56. std::condition_variable m_cond;
  57. std::atomic<bool> m_waitbit;
  58. VAR_T(unsigned) m_waiters;
  59.  
  60.  
  61.  
  62. public:
  63. waitset()
  64. : m_waitbit(false),
  65. m_waiters(0)
  66. {
  67.  
  68. }
  69.  
  70.  
  71. ~waitset()
  72. {
  73. bool waitbit = m_waitbit.load(std::memory_order_relaxed);
  74.  
  75. unsigned waiters = VAR(m_waiters);
  76.  
  77. RL_ASSERT(! waitbit && ! waiters);
  78. }
  79.  
  80.  
  81.  
  82. private:
  83. void prv_signal(bool waitbit, bool broadcast)
  84. {
  85. if (! waitbit) return;
  86.  
  87. m_mutex.lock($);
  88.  
  89. unsigned waiters = VAR(m_waiters);
  90.  
  91. if (waiters < 2 || broadcast)
  92. {
  93. m_waitbit.store(false, std::memory_order_relaxed);
  94. }
  95.  
  96. m_mutex.unlock($);
  97.  
  98. if (waiters)
  99. {
  100. if (! broadcast)
  101. {
  102. m_cond.notify_one($);
  103. }
  104.  
  105. else
  106. {
  107. m_cond.notify_all($);
  108. }
  109. }
  110. }
  111.  
  112.  
  113.  
  114. public:
  115. void wait_begin()
  116. {
  117. m_mutex.lock($);
  118.  
  119. m_waitbit.store(true, std::memory_order_relaxed);
  120.  
  121. MEMBAR_SEQ_CST();
  122. }
  123.  
  124.  
  125. bool wait_try_begin()
  126. {
  127. if (! m_mutex.try_lock($)) return false;
  128.  
  129. m_waitbit.store(true, std::memory_order_relaxed);
  130.  
  131. MEMBAR_SEQ_CST();
  132.  
  133. return true;
  134. }
  135.  
  136.  
  137. void wait_cancel()
  138. {
  139. unsigned waiters = VAR(m_waiters);
  140.  
  141. if (! waiters)
  142. {
  143. m_waitbit.store(false, std::memory_order_relaxed);
  144. }
  145.  
  146. m_mutex.unlock($);
  147. }
  148.  
  149.  
  150. void wait_commit()
  151. {
  152. MEMBAR_SEQ_CST();
  153.  
  154. ++VAR(m_waiters);
  155.  
  156. m_cond.wait(m_mutex, $);
  157.  
  158. if (! --VAR(m_waiters))
  159. {
  160. m_waitbit.store(false, std::memory_order_relaxed);
  161. }
  162.  
  163. m_mutex.unlock($);
  164. }
  165.  
  166.  
  167.  
  168. public:
  169. void signal()
  170. {
  171. MEMBAR_SEQ_CST();
  172.  
  173. bool waitbit = m_waitbit.load(std::memory_order_relaxed);
  174.  
  175. prv_signal(waitbit, false);
  176. }
  177.  
  178.  
  179. void broadcast()
  180. {
  181. MEMBAR_SEQ_CST();
  182.  
  183. bool waitbit = m_waitbit.load(std::memory_order_relaxed);
  184.  
  185. prv_signal(waitbit, true);
  186. }
  187. };
  188.  
  189.  
  190.  
  191.  
  192. struct node
  193. {
  194. VAR_T(int) m_state;
  195. std::atomic<node*> m_next;
  196. node(int state) : m_state(state) {}
  197. };
  198.  
  199.  
  200.  
  201.  
  202. // I have the garbage collector turned on in Relacy.
  203. // That's why this non-blocking stack algorihtm works...
  204. // ;^)
  205. class nbstack
  206. {
  207. std::atomic<node*> m_head;
  208.  
  209.  
  210.  
  211. public:
  212. nbstack()
  213. : m_head(NULL)
  214. {
  215.  
  216. }
  217.  
  218.  
  219. ~nbstack()
  220. {
  221. node* head = m_head.load(std::memory_order_relaxed);
  222.  
  223. RL_ASSERT(! head);
  224. }
  225.  
  226.  
  227.  
  228. public:
  229. void push(node* n)
  230. {
  231. node* head = m_head.load(std::memory_order_relaxed);
  232.  
  233. do
  234. {
  235. n->m_next.store(head, std::memory_order_relaxed);
  236.  
  237. MEMBAR_RELEASE();
  238. }
  239.  
  240. while (! m_head.compare_exchange_weak(
  241. head,
  242. n,
  243. std::memory_order_relaxed));
  244. }
  245.  
  246.  
  247. node* try_flush()
  248. {
  249. node* head = m_head.exchange(NULL, std::memory_order_relaxed);
  250.  
  251. if (head) MEMBAR_CONSUME();
  252.  
  253. return head;
  254. }
  255.  
  256.  
  257. node* get_head()
  258. {
  259. node* head = m_head.load(std::memory_order_relaxed);
  260.  
  261. if (head) MEMBAR_CONSUME();
  262.  
  263. return head;
  264. }
  265.  
  266.  
  267. node* try_pop()
  268. {
  269. node* head = m_head.load(std::memory_order_relaxed);
  270. node* xchg;
  271.  
  272. do
  273. {
  274. if (! head) return NULL;
  275.  
  276. MEMBAR_CONSUME();
  277.  
  278. xchg = head->m_next.load(std::memory_order_relaxed);
  279. }
  280.  
  281. while (! m_head.compare_exchange_weak(
  282. head,
  283. xchg,
  284. std::memory_order_relaxed));
  285.  
  286. return head;
  287. }
  288. };
  289.  
  290.  
  291.  
  292.  
  293. #define ITERS 8
  294. #define PRODUCERS 2
  295. #define CONSUMERS PRODUCERS
  296. #define THREADS (PRODUCERS + CONSUMERS)
  297.  
  298.  
  299. struct waitset_test
  300. : rl::test_suite<waitset_test, THREADS>
  301. {
  302. nbstack g_nbstack;
  303. waitset g_waitset;
  304.  
  305.  
  306.  
  307. void produce(node* n)
  308. {
  309. g_nbstack.push(n);
  310.  
  311. g_waitset.signal();
  312. }
  313.  
  314.  
  315.  
  316. node* consume()
  317. {
  318. node* n;
  319.  
  320. while (! (n = g_nbstack.try_pop()))
  321. {
  322. g_waitset.wait_begin();
  323.  
  324. if ((n = g_nbstack.try_pop()))
  325. {
  326. g_waitset.wait_cancel();
  327.  
  328. break;
  329. }
  330.  
  331. g_waitset.wait_commit();
  332. }
  333.  
  334. return n;
  335. }
  336.  
  337.  
  338.  
  339. void thread(unsigned int tidx)
  340. {
  341. if (tidx < PRODUCERS)
  342. {
  343. // producers
  344. DBG_PRINTF(("(%u):producer:enter\n", tidx));
  345.  
  346. for (unsigned i = 0; i < ITERS; ++i)
  347. {
  348. int state = (i + 1) << tidx;
  349.  
  350. node* n = new node(state);
  351.  
  352. DBG_PRINTF(("(%u):producer:produced(%p)(%d)\n",
  353. tidx, (void*)n, state));
  354.  
  355. produce(n);
  356. }
  357.  
  358. DBG_PRINTF(("(%u):producer:leave\n", tidx));
  359. }
  360.  
  361. else
  362. {
  363. // consumers
  364. DBG_PRINTF(("(%u):consumer:enter\n", tidx));
  365.  
  366. for (unsigned i = 0; i < ITERS; ++i)
  367. {
  368. node* n = consume();
  369.  
  370. int state = VAR(n->m_state);
  371.  
  372. DBG_PRINTF(("(%u):consumer:consumed(%p)(%d)\n",
  373. tidx, (void*)n, state));
  374.  
  375. // delete n; we are in a GC... No need to delete nodes! ;^)
  376. }
  377.  
  378. DBG_PRINTF(("(%u):consumer:leave\n", tidx));
  379. }
  380. }
  381. };
  382.  
  383.  
  384.  
  385.  
  386.  
  387.  
  388.  
  389.  
  390. int main()
  391. {
  392. {
  393. rl::test_params p;
  394.  
  395. p.iteration_count = 30000000;
  396. //p.execution_depth_limit = 10000;
  397. //p.search_type = rl::sched_bound;
  398. //p.search_type = rl::fair_full_search_scheduler_type;
  399. //p.search_type = rl::fair_context_bound_scheduler_type;
  400. rl::simulate<waitset_test>(p);
  401. }
  402.  
  403. std::getchar();
  404.  
  405. return 0;
  406. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement