Advertisement
Guest User

Untitled

a guest
Oct 30th, 2011
217
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.92 KB | None | 0 0
  1. /*
  2. Bounded Multi-Producer/Multi-Consumer FIFO Queue
  3. Copyright (C) 2011 Christopher Michael Thomasson
  4.  
  5. This program is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9.  
  10.  
  11. This program is distributed in the hope that it will be useful,
  12. but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. GNU General Public License for more details.
  15.  
  16.  
  17. You should have received a copy of the GNU General Public License
  18. along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. _______________________________________________________________________*/
  20.  
  21.  
  22.  
  23.  
  24. //#define RL_DEBUGBREAK_ON_ASSERT
  25. //#define RL_MSVC_OUTPUT
  26. //#define RL_FORCE_SEQ_CST
  27. //#define RL_GC
  28.  
  29.  
  30. #include <relacy/relacy_std.hpp>
  31. #include <cstdio>
  32.  
  33.  
  34. #define mb_relaxed std::memory_order_relaxed
  35. #define mb_acquire std::memory_order_acquire
  36. #define mb_release std::memory_order_release
  37.  
  38.  
  39. template<typename T, unsigned T_depth /* MUST BE POW OF 2 ! */>
  40. struct mpmc_bounded_queue
  41. {
  42. struct cell_type
  43. {
  44. std::atomic<unsigned> m_state;
  45. VAR_T(T) m_object;
  46. };
  47.  
  48.  
  49. std::atomic<unsigned> m_head;
  50. std::atomic<unsigned> m_tail;
  51. cell_type m_buffer[T_depth];
  52.  
  53.  
  54. mpmc_bounded_queue() : m_head(0), m_tail(0)
  55. {
  56. // initialize version numbers.
  57. for (unsigned i = 0; i < T_depth; ++i)
  58. {
  59. m_buffer[i].m_state.store(i, mb_relaxed);
  60. }
  61. }
  62.  
  63.  
  64. void push(T const& obj)
  65. {
  66. // obtain our head version and cell.
  67. unsigned idx = m_head.fetch_add(1, mb_relaxed);
  68. cell_type& cell = m_buffer[idx & (T_depth - 1U)];
  69.  
  70. // wait for it...
  71. rl::linear_backoff backoff;
  72. while (cell.m_state.load(mb_relaxed) != idx)
  73. backoff.yield($);
  74. std::atomic_thread_fence(mb_acquire);
  75.  
  76. // GOT IT! Okay, write to the object.
  77. VAR(cell.m_object) = obj;
  78.  
  79. // We are done; allow a consumer to consume.
  80. std::atomic_thread_fence(mb_release);
  81. cell.m_state.store(idx + 1, mb_relaxed);
  82. }
  83.  
  84.  
  85. void pop(T& obj)
  86. {
  87. // obtain our tail version and cell.
  88. unsigned idx = m_tail.fetch_add(1, mb_relaxed);
  89. cell_type& cell = m_buffer[idx & (T_depth - 1U)];
  90.  
  91. // wait for it...
  92. rl::linear_backoff backoff;
  93. while (cell.m_state.load(mb_relaxed) != idx + 1)
  94. backoff.yield($);
  95. std::atomic_thread_fence(mb_acquire);
  96.  
  97. // GOT IT! Okay, read from the object.
  98. obj = VAR(cell.m_object);
  99.  
  100. // We are done; allow a producer to produce.
  101. std::atomic_thread_fence(mb_release);
  102. cell.m_state.store(idx + T_depth, mb_relaxed);
  103. }
  104. };
  105.  
  106.  
  107.  
  108.  
  109. #define PRODUCERS 3
  110. #define CONSUMERS 2
  111. #define THREADS (PRODUCERS + CONSUMERS)
  112. #define ITERS 15
  113. #define BUFFER_SIZE (8 - CONSUMERS)
  114.  
  115.  
  116. struct mpmc_bounded_queue_test
  117. : rl::test_suite<mpmc_bounded_queue_test, THREADS>
  118. {
  119. mpmc_bounded_queue<unsigned, BUFFER_SIZE + CONSUMERS> g_queue;
  120. unsigned g_test_term_producers; // test termination only!
  121. unsigned g_test_term_consumers; // test termination only!
  122.  
  123.  
  124. void before()
  125. {
  126. g_test_term_producers = PRODUCERS;
  127. g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
  128. }
  129.  
  130.  
  131. void after()
  132. {
  133. RL_ASSERT(! g_test_term_consumers &&
  134. ! g_test_term_producers);
  135. }
  136.  
  137.  
  138. void thread_producer(unsigned int tidx)
  139. {
  140. for (unsigned i = 0; i < ITERS; ++i)
  141. {
  142. g_queue.push(((tidx + 1) << 8U) + i);
  143. }
  144.  
  145. if (! --g_test_term_producers)
  146. {
  147. for (unsigned i = 0; i < CONSUMERS; ++i)
  148. {
  149. g_queue.push(666666);
  150. }
  151. }
  152. }
  153.  
  154.  
  155. void thread_consumer(unsigned int tidx)
  156. {
  157. for (;;)
  158. {
  159. unsigned v;
  160. g_queue.pop(v);
  161. --g_test_term_consumers;
  162.  
  163. //printf("thread_consumer(%u)-popped(%u)\n", tidx, v);
  164.  
  165. if (v == 666666)
  166. {
  167. break;
  168. }
  169. }
  170. }
  171.  
  172.  
  173. void thread(unsigned int tidx)
  174. {
  175. if (tidx < PRODUCERS)
  176. {
  177. thread_producer(tidx);
  178. }
  179.  
  180. else
  181. {
  182. thread_consumer(tidx);
  183. }
  184. }
  185. };
  186.  
  187.  
  188.  
  189.  
  190. int main()
  191. {
  192. {
  193. rl::test_params p;
  194.  
  195. p.iteration_count = 30000000;
  196. //p.execution_depth_limit = 10000;
  197. //p.search_type = rl::sched_bound;
  198. //p.search_type = rl::fair_full_search_scheduler_type;
  199. //p.search_type = rl::fair_context_bound_scheduler_type;
  200.  
  201. rl::simulate<mpmc_bounded_queue_test>(p);
  202. }
  203.  
  204. return 0;
  205. }
  206.  
  207.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement