Advertisement
Guest User

Chris M. Thomasson

a guest
Dec 20th, 2011
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.67 KB | None | 0 0
  1. /*
  2. Bounded Multi-Producer/Multi-Consumer FIFO Queue
  3. Copyright (C) 2011 Christopher Michael Thomasson
  4.  
  5. This program is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9.  
  10.  
  11. This program is distributed in the hope that it will be useful,
  12. but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. GNU General Public License for more details.
  15.  
  16.  
  17. You should have received a copy of the GNU General Public License
  18. along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. ____________________________________________________________________*/
  20.  
  21.  
  22.  
  23.  
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <time.h>
  27. #include <assert.h>
  28. #include <pthread.h>
  29. #include <sched.h>
  30.  
  31.  
  32.  
  33.  
  34. /* Global Settings
  35. __________________________________________________________*/
  36. #if defined (_MSC_VER)
  37. # pragma warning (disable:4116)
  38. #endif
  39.  
  40.  
  41. /* #define CMT_QUEUE_ALGO 1 */
  42. #define DSL_QUEUE_ALGO 1
  43.  
  44.  
  45. #define N 4096
  46. #define ITERS 100000000
  47. #define PUSH_POPS 4
  48. #define THREADS 4
  49. typedef long queue_state_t;
  50.  
  51.  
  52. typedef char global_settings_static_assert[
  53. N >= (PUSH_POPS * THREADS) ? 1 : -1
  54. ];
  55.  
  56.  
  57.  
  58.  
  59. /* Simple Stop Watch
  60. __________________________________________________________*/
  61. struct c_clock_stop_watch
  62. {
  63. clock_t start;
  64. clock_t stop;
  65. };
  66.  
  67.  
  68. void
  69. c_clock_stop_watch_start(
  70. struct c_clock_stop_watch* const self
  71. ){
  72. self->start = clock();
  73. }
  74.  
  75.  
  76. double
  77. c_clock_stop_watch_stop(
  78. struct c_clock_stop_watch* const self
  79. ){
  80. self->stop = clock();
  81. return (((double)self->stop - (double)self->start) / CLOCKS_PER_SEC) * 1000.0;
  82. }
  83.  
  84.  
  85.  
  86.  
  87.  
  88. /* Backoff Operations
  89. __________________________________________________________*/
  90. void
  91. thread_backoff(
  92. const char* msg
  93. ){
  94. sched_yield();
  95. }
  96.  
  97.  
  98.  
  99.  
  100. /* Alignment Operations
  101. __________________________________________________________*/
  102. #if ! defined (ALIGN_UINTPTR)
  103. # define ALIGN_UINTPTR size_t
  104. #endif
  105.  
  106.  
  107. typedef ALIGN_UINTPTR align_uintptr;
  108.  
  109.  
  110. typedef char ALIGN_SASSERT
  111. [
  112. sizeof(align_uintptr) == sizeof(void*) &&
  113. sizeof(align_uintptr) == sizeof(char (*) (double))
  114. ? 1 : -1
  115. ];
  116.  
  117.  
  118. #define ALIGN_UP(mp_ptr, mp_align) \
  119. ((void*)( \
  120. (((align_uintptr)(mp_ptr)) + ((mp_align) - 1)) \
  121. & ~(((mp_align) - 1)) \
  122. ))
  123.  
  124.  
  125.  
  126.  
  127. /* IA-32 Atomic Operations
  128. __________________________________________________________*/
  129. typedef unsigned __int32 atomic_uint32_t;
  130.  
  131.  
  132. #define ATOMIC_API(mp_ret) \
  133. __declspec(naked) \
  134. mp_ret \
  135. __cdecl
  136.  
  137.  
  138. ATOMIC_API(atomic_uint32_t)
  139. ATOMIC_XADD(
  140. atomic_uint32_t volatile* dest,
  141. atomic_uint32_t value
  142. ){
  143. _asm
  144. {
  145. MOV ECX, [ESP + 4]
  146. MOV EAX, [ESP + 8]
  147. LOCK XADD [ECX], EAX
  148. RET
  149. }
  150. }
  151.  
  152.  
  153. ATOMIC_API(atomic_uint32_t)
  154. ATOMIC_XCHG(
  155. atomic_uint32_t volatile* dest,
  156. atomic_uint32_t value
  157. ){
  158. _asm
  159. {
  160. MOV ECX, [ESP + 4]
  161. MOV EAX, [ESP + 8]
  162. XCHG [ECX], EAX
  163. RET
  164. }
  165. }
  166.  
  167.  
  168. ATOMIC_API(atomic_uint32_t)
  169. ATOMIC_LOAD(
  170. atomic_uint32_t volatile* dest
  171. ){
  172. _asm
  173. {
  174. MOV ECX, [ESP + 4]
  175. MOV EAX, [ECX]
  176. RET
  177. }
  178. }
  179.  
  180.  
  181. ATOMIC_API(void)
  182. ATOMIC_STORE(
  183. atomic_uint32_t volatile* dest,
  184. atomic_uint32_t value
  185. ){
  186. _asm
  187. {
  188. MOV ECX, [ESP + 4]
  189. MOV EAX, [ESP + 8]
  190. MOV [ECX], EAX
  191. RET
  192. }
  193. }
  194.  
  195.  
  196.  
  197.  
  198. /* IA-32 Cache Line
  199. __________________________________________________________*/
  200. #define L2_CACHE 128
  201.  
  202.  
  203. #define L2_PAD_EX(mp_name, mp_type, mp_count) \
  204. unsigned char mp_name[ \
  205. L2_CACHE - (sizeof(mp_type) * (mp_count)) \
  206. ]
  207.  
  208.  
  209. #define L2_PAD(mp_name, mp_type) \
  210. L2_PAD_EX(mp_name, mp_type, 1)
  211.  
  212.  
  213.  
  214.  
  215. /* MPMC Bounded Queue By Chris M. Thomasson
  216. __________________________________________________________*/
  217. struct cmt_queue_cell
  218. {
  219. atomic_uint32_t ver;
  220. queue_state_t state;
  221. L2_PAD(l2pad1, struct { atomic_uint32_t a; queue_state_t b; } );
  222. };
  223.  
  224.  
  225. struct cmt_queue
  226. {
  227. atomic_uint32_t head;
  228. L2_PAD(l2pad1, atomic_uint32_t);
  229. atomic_uint32_t tail;
  230. L2_PAD(l2pad2, atomic_uint32_t);
  231. struct cmt_queue_cell cells[N];
  232. };
  233.  
  234.  
  235. void
  236. cmt_queue_init(
  237. struct cmt_queue* const self
  238. ){
  239. atomic_uint32_t i;
  240.  
  241. self->head = 0;
  242. self->tail = 0;
  243.  
  244. for (i = 0; i < N; ++i)
  245. {
  246. self->cells[i].ver = i;
  247. }
  248. }
  249.  
  250.  
  251. void
  252. cmt_queue_push(
  253. struct cmt_queue* const self,
  254. queue_state_t qstate
  255. ){
  256. atomic_uint32_t ver = ATOMIC_XADD(&self->head, 1);
  257. struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
  258.  
  259. while (ATOMIC_LOAD(&cell->ver) != ver)
  260. {
  261. thread_backoff("cmt_queue_push");
  262. }
  263.  
  264. cell->state = qstate;
  265.  
  266. ATOMIC_STORE(&cell->ver, ver + 1);
  267. }
  268.  
  269.  
  270. queue_state_t
  271. cmt_queue_pop(
  272. struct cmt_queue* const self
  273. ){
  274. atomic_uint32_t ver = ATOMIC_XADD(&self->tail, 1);
  275. struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
  276. queue_state_t qstate;
  277.  
  278. while (ATOMIC_LOAD(&cell->ver) != ver + 1)
  279. {
  280. thread_backoff("cmt_queue_pop");
  281. }
  282.  
  283. qstate = cell->state;
  284.  
  285. ATOMIC_STORE(&cell->ver, ver + N);
  286.  
  287. return qstate;
  288. }
  289.  
  290.  
  291.  
  292.  
  293. /* MPMC Bounded Queue /w Dual Spinlock's
  294. __________________________________________________________*/
  295. struct dsl_queue_cell
  296. {
  297. queue_state_t state;
  298. L2_PAD_EX(l2pad2, atomic_uint32_t, 2);
  299. };
  300.  
  301.  
  302. struct dsl_queue
  303. {
  304. atomic_uint32_t head_lock;
  305. L2_PAD(l2pad1, atomic_uint32_t);
  306. atomic_uint32_t head;
  307. L2_PAD(l2pad2, atomic_uint32_t);
  308. atomic_uint32_t tail_lock;
  309. L2_PAD(l2pad3, atomic_uint32_t);
  310. atomic_uint32_t tail;
  311. L2_PAD(l2pad4, atomic_uint32_t);
  312. struct dsl_queue_cell cells[N];
  313. };
  314.  
  315.  
  316. void
  317. dsl_queue_init(
  318. struct dsl_queue* const self
  319. ){
  320. self->head_lock = 0;
  321. self->head = 0;
  322. self->tail_lock = 0;
  323. self->tail = 0;
  324. }
  325.  
  326.  
  327. int
  328. dsl_queue_try_push(
  329. struct dsl_queue* const self,
  330. queue_state_t qstate
  331. ){
  332. atomic_uint32_t head;
  333. atomic_uint32_t tail;
  334.  
  335. /* acquire head lock */
  336. while (ATOMIC_XCHG(&self->head_lock, 1))
  337. {
  338. thread_backoff("dsl_queue_try_push");
  339. }
  340.  
  341. head = ATOMIC_LOAD(&self->head);
  342. tail = ATOMIC_LOAD(&self->tail);
  343.  
  344. if (tail == ((head + 1) & (N - 1)))
  345. {
  346. /* release head lock */
  347. ATOMIC_STORE(&self->head_lock, 0);
  348.  
  349. return 0;
  350. }
  351.  
  352. self->cells[head].state = qstate;
  353. ATOMIC_STORE(&self->head, (head + 1UL) & (N - 1UL));
  354.  
  355. /* release head lock */
  356. ATOMIC_STORE(&self->head_lock, 0);
  357.  
  358. return 1;
  359. }
  360.  
  361.  
  362. int
  363. dsl_queue_try_pop(
  364. struct dsl_queue* const self,
  365. queue_state_t* qstate
  366. ){
  367. atomic_uint32_t head;
  368. atomic_uint32_t tail;
  369.  
  370. /* acquire tail lock */
  371. while (ATOMIC_XCHG(&self->tail_lock, 1))
  372. {
  373. thread_backoff("dsl_queue_try_pop");
  374. }
  375.  
  376. head = ATOMIC_LOAD(&self->head);
  377. tail = ATOMIC_LOAD(&self->tail);
  378.  
  379. if (head == tail)
  380. {
  381. /* release tail lock */
  382. ATOMIC_STORE(&self->tail_lock, 0);
  383.  
  384. return 0;
  385. }
  386.  
  387. *qstate = self->cells[head].state;
  388. ATOMIC_STORE(&self->tail, (tail + 1UL) & (N - 1UL));
  389.  
  390. /* release tail lock */
  391. ATOMIC_STORE(&self->tail_lock, 0);
  392.  
  393. return 1;
  394. }
  395.  
  396.  
  397. void
  398. dsl_queue_push(
  399. struct dsl_queue* const self,
  400. queue_state_t qstate
  401. ){
  402. while (! dsl_queue_try_push(self, qstate))
  403. {
  404. thread_backoff("dsl_queue_push");
  405. }
  406. }
  407.  
  408.  
  409. queue_state_t
  410. dsl_queue_pop(
  411. struct dsl_queue* const self
  412. ){
  413. queue_state_t qstate;
  414.  
  415. while (! dsl_queue_try_pop(self, &qstate))
  416. {
  417. thread_backoff("dsl_queue_pop");
  418. }
  419.  
  420. return qstate;
  421. }
  422.  
  423.  
  424.  
  425.  
  426.  
  427.  
  428.  
  429.  
  430. /* MPMC Bounded Queue Test Abstraction
  431. __________________________________________________________*/
  432. #if defined (CMT_QUEUE_ALGO)
  433. /* Chris M. Thomasson Algorithm */
  434. # define QUEUE_ALGO_ID "Chris M. Thomasson MPMC Queue Aglorithm"
  435. typedef struct cmt_queue test_queue_t;
  436. # define test_queue_init cmt_queue_init
  437. # define test_queue_push cmt_queue_push
  438. # define test_queue_pop cmt_queue_pop
  439. #elif defined (DSL_QUEUE_ALGO)
  440. /* Dual Spinlock Algorithm */
  441. # define QUEUE_ALGO_ID "Dual Spinlock MPMC Queue Aglorithm"
  442. typedef struct dsl_queue test_queue_t;
  443. # define test_queue_init dsl_queue_init
  444. # define test_queue_push dsl_queue_push
  445. # define test_queue_pop dsl_queue_pop
  446. #else
  447. # error No Test Macro Defined!
  448. #endif
  449.  
  450.  
  451.  
  452.  
  453. /* Stop Watch Abstraction
  454. __________________________________________________________*/
  455.  
  456. /* C `clock()' Stop Watch */
  457. typedef struct c_clock_stop_watch stop_watch_t;
  458. #define stop_watch_start c_clock_stop_watch_start
  459. #define stop_watch_stop c_clock_stop_watch_stop
  460.  
  461.  
  462.  
  463.  
  464. /* Test Program
  465. __________________________________________________________*/
  466. static test_queue_t* g_tqueue;
  467. static pthread_barrier_t g_tbarrier;
  468.  
  469.  
  470. void*
  471. test_thread(
  472. void* state
  473. ){
  474. unsigned i;
  475. queue_state_t qstate;
  476. unsigned const tid = (unsigned const)state;
  477.  
  478. pthread_barrier_wait(&g_tbarrier);
  479.  
  480. for (i = 0; i < ITERS; ++i)
  481. {
  482. unsigned c;
  483.  
  484. for (c = 0; c < PUSH_POPS; ++c)
  485. {
  486. test_queue_push(g_tqueue, (queue_state_t)tid);
  487.  
  488. /*printf("test_thread(%u)::pushed(%u)\n", tid, tid);*/
  489. }
  490.  
  491. for (c = 0; c < PUSH_POPS; ++c)
  492. {
  493. qstate = test_queue_pop(g_tqueue);
  494.  
  495. /*printf("test_thread(%u)::popped(%u)\n", tid, (unsigned)qstate);*/
  496. }
  497.  
  498. }
  499.  
  500. return NULL;
  501. }
  502.  
  503.  
  504.  
  505.  
  506. /* Main Entry */
  507. int
  508. main(void)
  509. {
  510. pthread_t tid[THREADS];
  511. unsigned i = 0;
  512. stop_watch_t stop_watch;
  513. double elapsed_time;
  514.  
  515. void* raw_buf = malloc(sizeof(test_queue_t) + L2_CACHE);
  516. puts("Testing " QUEUE_ALGO_ID "...");
  517.  
  518. g_tqueue = ALIGN_UP(raw_buf, L2_CACHE);
  519. test_queue_init(g_tqueue);
  520. pthread_barrier_init(&g_tbarrier, NULL, THREADS);
  521.  
  522. stop_watch_start(&stop_watch);
  523.  
  524. for (i = 0; i < THREADS; ++i)
  525. {
  526. pthread_create(tid + i, NULL, test_thread, (void*)i);
  527. }
  528.  
  529. for (i = 0; i < THREADS; ++i)
  530. {
  531. pthread_join(tid[i], NULL);
  532. }
  533.  
  534. elapsed_time = stop_watch_stop(&stop_watch);
  535. printf("\nelapsed time: %lf\n", elapsed_time);
  536. pthread_barrier_destroy(&g_tbarrier);
  537.  
  538. puts("\n\n\n___________________________________________\n"
  539. "The program has completed; hit <ENTER> to exit...");
  540.  
  541. getchar();
  542.  
  543. free(raw_buf);
  544.  
  545. return 0;
  546. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement