Advertisement
Guest User

Chris M. Thomasson

a guest
Dec 20th, 2011
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.66 KB | None | 0 0
  1. /*
  2. Bounded Multi-Producer/Multi-Consumer FIFO Queue
  3. Copyright (C) 2011 Christopher Michael Thomasson
  4.  
  5. This program is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9.  
  10.  
  11. This program is distributed in the hope that it will be useful,
  12. but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. GNU General Public License for more details.
  15.  
  16.  
  17. You should have received a copy of the GNU General Public License
  18. along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. ____________________________________________________________________*/
  20.  
  21.  
  22.  
  23.  
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <time.h>
  27. #include <assert.h>
  28. #include <pthread.h>
  29.  
  30. #undef WIN32_LEAN_AND_MEAN
  31. #define WIN32_LEAN_AND_MEAN
  32. #include <windows.h>
  33.  
  34.  
  35.  
  36.  
  37. /* Global Settings
  38. __________________________________________________________*/
  39. /* #define CMT_QUEUE_ALGO 1 */
  40. #define DSL_QUEUE_ALGO 1
  41.  
  42.  
  43. #define N 4096
  44. #define ITERS 100000000
  45. #define PUSH_POPS 4
  46. #define THREADS 4
  47. typedef long queue_state_t;
  48.  
  49.  
  50. typedef char global_settings_static_assert[
  51. N >= (PUSH_POPS * THREADS) ? 1 : -1
  52. ];
  53.  
  54.  
  55.  
  56.  
  57. /* Simple Stop Watch
  58. __________________________________________________________*/
  59. struct c_clock_stop_watch
  60. {
  61. clock_t start;
  62. clock_t stop;
  63. };
  64.  
  65.  
  66. void
  67. c_clock_stop_watch_start(
  68. struct c_clock_stop_watch* const self
  69. ){
  70. self->start = clock();
  71. }
  72.  
  73.  
  74. double
  75. c_clock_stop_watch_stop(
  76. struct c_clock_stop_watch* const self
  77. ){
  78. self->stop = clock();
  79. return (((double)self->stop - (double)self->start) / CLOCKS_PER_SEC) * 1000.0;
  80. }
  81.  
  82.  
  83.  
  84.  
  85.  
  86. /* Backoff Operations
  87. __________________________________________________________*/
  88. void
  89. thread_backoff(
  90. const char* msg
  91. ){
  92. Sleep(0);
  93. }
  94.  
  95.  
  96.  
  97.  
  98. /* Alignment Operations
  99. __________________________________________________________*/
  100. #if ! defined (ALIGN_UINTPTR)
  101. # define ALIGN_UINTPTR size_t
  102. #endif
  103.  
  104.  
  105. typedef ALIGN_UINTPTR align_uintptr;
  106.  
  107.  
  108. typedef char ALIGN_SASSERT
  109. [
  110. sizeof(align_uintptr) == sizeof(void*) &&
  111. sizeof(align_uintptr) == sizeof(char (*) (double))
  112. ? 1 : -1
  113. ];
  114.  
  115.  
  116. #define ALIGN_UP(mp_ptr, mp_align) \
  117. ((void*)( \
  118. (((align_uintptr)(mp_ptr)) + ((mp_align) - 1)) \
  119. & ~(((mp_align) - 1)) \
  120. ))
  121.  
  122.  
  123.  
  124.  
  125. /* IA-32 Atomic Operations
  126. __________________________________________________________*/
  127. typedef unsigned __int32 atomic_uint32_t;
  128.  
  129.  
  130. #define ATOMIC_API(mp_ret) \
  131. __declspec(naked) \
  132. mp_ret \
  133. __cdecl
  134.  
  135.  
  136. ATOMIC_API(atomic_uint32_t)
  137. ATOMIC_XADD(
  138. atomic_uint32_t volatile* dest,
  139. atomic_uint32_t value
  140. ){
  141. _asm
  142. {
  143. MOV ECX, [ESP + 4]
  144. MOV EAX, [ESP + 8]
  145. LOCK XADD [ECX], EAX
  146. RET
  147. }
  148. }
  149.  
  150.  
  151. ATOMIC_API(atomic_uint32_t)
  152. ATOMIC_XCHG(
  153. atomic_uint32_t volatile* dest,
  154. atomic_uint32_t value
  155. ){
  156. _asm
  157. {
  158. MOV ECX, [ESP + 4]
  159. MOV EAX, [ESP + 8]
  160. XCHG [ECX], EAX
  161. RET
  162. }
  163. }
  164.  
  165.  
  166. ATOMIC_API(atomic_uint32_t)
  167. ATOMIC_LOAD(
  168. atomic_uint32_t volatile* dest
  169. ){
  170. _asm
  171. {
  172. MOV ECX, [ESP + 4]
  173. MOV EAX, [ECX]
  174. RET
  175. }
  176. }
  177.  
  178.  
  179. ATOMIC_API(void)
  180. ATOMIC_STORE(
  181. atomic_uint32_t volatile* dest,
  182. atomic_uint32_t value
  183. ){
  184. _asm
  185. {
  186. MOV ECX, [ESP + 4]
  187. MOV EAX, [ESP + 8]
  188. MOV [ECX], EAX
  189. RET
  190. }
  191. }
  192.  
  193.  
  194.  
  195.  
  196. /* IA-32 Cache Line
  197. __________________________________________________________*/
  198. #define L2_CACHE 128
  199.  
  200.  
  201. #define L2_PAD_EX(mp_name, mp_type, mp_count) \
  202. unsigned char mp_name[ \
  203. L2_CACHE - (sizeof(mp_type) * (mp_count)) \
  204. ]
  205.  
  206.  
  207. #define L2_PAD(mp_name, mp_type) \
  208. L2_PAD_EX(mp_name, mp_type, 1)
  209.  
  210.  
  211.  
  212.  
  213. /* MPMC Bounded Queue By Chris M. Thomasson
  214. __________________________________________________________*/
  215. struct cmt_queue_cell
  216. {
  217. atomic_uint32_t ver;
  218. queue_state_t state;
  219. L2_PAD(l2pad1, struct { atomic_uint32_t a; queue_state_t b; } );
  220. };
  221.  
  222.  
  223. struct cmt_queue
  224. {
  225. atomic_uint32_t head;
  226. L2_PAD(l2pad1, atomic_uint32_t);
  227. atomic_uint32_t tail;
  228. L2_PAD(l2pad2, atomic_uint32_t);
  229. struct cmt_queue_cell cells[N];
  230. };
  231.  
  232.  
  233. void
  234. cmt_queue_init(
  235. struct cmt_queue* const self
  236. ){
  237. atomic_uint32_t i;
  238.  
  239. self->head = 0;
  240. self->tail = 0;
  241.  
  242. for (i = 0; i < N; ++i)
  243. {
  244. self->cells[i].ver = i;
  245. }
  246. }
  247.  
  248.  
  249. void
  250. cmt_queue_push(
  251. struct cmt_queue* const self,
  252. queue_state_t qstate
  253. ){
  254. atomic_uint32_t ver = ATOMIC_XADD(&self->head, 1);
  255. struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
  256.  
  257. while (ATOMIC_LOAD(&cell->ver) != ver)
  258. {
  259. thread_backoff("cmt_queue_push");
  260. }
  261.  
  262. cell->state = qstate;
  263.  
  264. ATOMIC_STORE(&cell->ver, ver + 1);
  265. }
  266.  
  267.  
  268. queue_state_t
  269. cmt_queue_pop(
  270. struct cmt_queue* const self
  271. ){
  272. atomic_uint32_t ver = ATOMIC_XADD(&self->tail, 1);
  273. struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
  274. queue_state_t qstate;
  275.  
  276. while (ATOMIC_LOAD(&cell->ver) != ver + 1)
  277. {
  278. thread_backoff("cmt_queue_pop");
  279. }
  280.  
  281. qstate = cell->state;
  282.  
  283. ATOMIC_STORE(&cell->ver, ver + N);
  284.  
  285. return qstate;
  286. }
  287.  
  288.  
  289.  
  290.  
  291. /* MPMC Bounded Queue /w Dual Spinlock's
  292. __________________________________________________________*/
  293. struct dsl_queue_cell
  294. {
  295. queue_state_t state;
  296. L2_PAD_EX(l2pad2, atomic_uint32_t, 2);
  297. };
  298.  
  299.  
  300. struct dsl_queue
  301. {
  302. atomic_uint32_t head_lock;
  303. L2_PAD(l2pad1, atomic_uint32_t);
  304. atomic_uint32_t head;
  305. L2_PAD(l2pad2, atomic_uint32_t);
  306. atomic_uint32_t tail_lock;
  307. L2_PAD(l2pad3, atomic_uint32_t);
  308. atomic_uint32_t tail;
  309. L2_PAD(l2pad4, atomic_uint32_t);
  310. struct dsl_queue_cell cells[N];
  311. };
  312.  
  313.  
  314. void
  315. dsl_queue_init(
  316. struct dsl_queue* const self
  317. ){
  318. self->head_lock = 0;
  319. self->head = 0;
  320. self->tail_lock = 0;
  321. self->tail = 0;
  322. }
  323.  
  324.  
  325. int
  326. dsl_queue_try_push(
  327. struct dsl_queue* const self,
  328. queue_state_t qstate
  329. ){
  330. atomic_uint32_t head;
  331. atomic_uint32_t tail;
  332.  
  333. /* acquire head lock */
  334. while (ATOMIC_XCHG(&self->head_lock, 1))
  335. {
  336. thread_backoff("dsl_queue_try_push");
  337. }
  338.  
  339. head = ATOMIC_LOAD(&self->head);
  340. tail = ATOMIC_LOAD(&self->tail);
  341.  
  342. if (tail == ((head + 1) & (N - 1)))
  343. {
  344. /* release head lock */
  345. ATOMIC_STORE(&self->head_lock, 0);
  346.  
  347. return 0;
  348. }
  349.  
  350. self->cells[head].state = qstate;
  351. ATOMIC_STORE(&self->head, (head + 1UL) & (N - 1UL));
  352.  
  353. /* release head lock */
  354. ATOMIC_STORE(&self->head_lock, 0);
  355.  
  356. return 1;
  357. }
  358.  
  359.  
  360. int
  361. dsl_queue_try_pop(
  362. struct dsl_queue* const self,
  363. queue_state_t* qstate
  364. ){
  365. atomic_uint32_t head;
  366. atomic_uint32_t tail;
  367.  
  368. /* acquire tail lock */
  369. while (ATOMIC_XCHG(&self->tail_lock, 1))
  370. {
  371. thread_backoff("dsl_queue_try_pop");
  372. }
  373.  
  374. head = ATOMIC_LOAD(&self->head);
  375. tail = ATOMIC_LOAD(&self->tail);
  376.  
  377. if (head == tail)
  378. {
  379. /* release tail lock */
  380. ATOMIC_STORE(&self->tail_lock, 0);
  381.  
  382. return 0;
  383. }
  384.  
  385. *qstate = self->cells[head].state;
  386. ATOMIC_STORE(&self->tail, (tail + 1UL) & (N - 1UL));
  387.  
  388. /* release tail lock */
  389. ATOMIC_STORE(&self->tail_lock, 0);
  390.  
  391. return 1;
  392. }
  393.  
  394.  
  395. void
  396. dsl_queue_push(
  397. struct dsl_queue* const self,
  398. queue_state_t qstate
  399. ){
  400. while (! dsl_queue_try_push(self, qstate))
  401. {
  402. thread_backoff("dsl_queue_push");
  403. }
  404. }
  405.  
  406.  
  407. queue_state_t
  408. dsl_queue_pop(
  409. struct dsl_queue* const self
  410. ){
  411. queue_state_t qstate;
  412.  
  413. while (! dsl_queue_try_pop(self, &qstate))
  414. {
  415. thread_backoff("dsl_queue_pop");
  416. }
  417.  
  418. return qstate;
  419. }
  420.  
  421.  
  422.  
  423.  
  424.  
  425.  
  426.  
  427.  
  428. /* MPMC Bounded Queue Test Abstraction
  429. __________________________________________________________*/
  430. #if defined (CMT_QUEUE_ALGO)
  431. /* Chris M. Thomasson Algorithm */
  432. # define QUEUE_ALGO_ID "Chris M. Thomasson MPMC Queue Aglorithm"
  433. typedef struct cmt_queue test_queue_t;
  434. # define test_queue_init cmt_queue_init
  435. # define test_queue_push cmt_queue_push
  436. # define test_queue_pop cmt_queue_pop
  437. #elif defined (DSL_QUEUE_ALGO)
  438. /* Dual Spinlock Algorithm */
  439. # define QUEUE_ALGO_ID "Dual Spinlock MPMC Queue Aglorithm"
  440. typedef struct dsl_queue test_queue_t;
  441. # define test_queue_init dsl_queue_init
  442. # define test_queue_push dsl_queue_push
  443. # define test_queue_pop dsl_queue_pop
  444. #else
  445. # error No Test Macro Defined!
  446. #endif
  447.  
  448.  
  449.  
  450.  
  451. /* Stop Watch Abstraction
  452. __________________________________________________________*/
  453.  
  454. /* C `clock()' Stop Watch */
  455. typedef struct c_clock_stop_watch stop_watch_t;
  456. #define stop_watch_start c_clock_stop_watch_start
  457. #define stop_watch_stop c_clock_stop_watch_stop
  458.  
  459.  
  460.  
  461.  
  462. /* Test Program
  463. __________________________________________________________*/
  464. static test_queue_t* g_tqueue;
  465. static pthread_barrier_t g_tbarrier;
  466.  
  467.  
  468. void*
  469. test_thread(
  470. void* state
  471. ){
  472. unsigned i;
  473. queue_state_t qstate;
  474. unsigned const tid = (unsigned const)state;
  475.  
  476. pthread_barrier_wait(&g_tbarrier);
  477.  
  478. for (i = 0; i < ITERS; ++i)
  479. {
  480. unsigned c;
  481.  
  482. for (c = 0; c < PUSH_POPS; ++c)
  483. {
  484. test_queue_push(g_tqueue, (queue_state_t)tid);
  485.  
  486. /*printf("test_thread(%u)::pushed(%u)\n", tid, tid);*/
  487. }
  488.  
  489. for (c = 0; c < PUSH_POPS; ++c)
  490. {
  491. qstate = test_queue_pop(g_tqueue);
  492.  
  493. /*printf("test_thread(%u)::popped(%u)\n", tid, (unsigned)qstate);*/
  494. }
  495.  
  496. }
  497.  
  498. return NULL;
  499. }
  500.  
  501.  
  502.  
  503.  
  504. /* Main Entry */
  505. int
  506. main(void)
  507. {
  508. pthread_t tid[THREADS];
  509. unsigned i = 0;
  510. stop_watch_t stop_watch;
  511. double elapsed_time;
  512.  
  513. void* raw_buf = malloc(sizeof(test_queue_t) + L2_CACHE);
  514. puts("Testing " QUEUE_ALGO_ID "...");
  515.  
  516. g_tqueue = ALIGN_UP(raw_buf, L2_CACHE);
  517. test_queue_init(g_tqueue);
  518. pthread_barrier_init(&g_tbarrier, NULL, THREADS);
  519.  
  520. stop_watch_start(&stop_watch);
  521.  
  522. for (i = 0; i < THREADS; ++i)
  523. {
  524. pthread_create(tid + i, NULL, test_thread, (void*)i);
  525. }
  526.  
  527. for (i = 0; i < THREADS; ++i)
  528. {
  529. pthread_join(tid[i], NULL);
  530. }
  531.  
  532. elapsed_time = stop_watch_stop(&stop_watch);
  533. printf("\nelapsed time: %lf\n", elapsed_time);
  534. pthread_barrier_destroy(&g_tbarrier);
  535.  
  536. puts("\n\n\n___________________________________________\n"
  537. "The program has completed; hit <ENTER> to exit...");
  538.  
  539. getchar();
  540.  
  541. free(raw_buf);
  542.  
  543. return 0;
  544. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement