Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- Bounded Multi-Producer/Multi-Consumer FIFO Queue
- Copyright (C) 2011 Christopher Michael Thomasson
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ____________________________________________________________________*/
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
- #include <assert.h>
- #include <pthread.h>
- #undef WIN32_LEAN_AND_MEAN
- #define WIN32_LEAN_AND_MEAN
- #include <windows.h>
- /* Global Settings
- __________________________________________________________*/
- /* #define CMT_QUEUE_ALGO 1 */
- #define DSL_QUEUE_ALGO 1
- #define N 4096
- #define ITERS 100000000
- #define PUSH_POPS 4
- #define THREADS 4
- typedef long queue_state_t;
- typedef char global_settings_static_assert[
- N >= (PUSH_POPS * THREADS) ? 1 : -1
- ];
- /* Simple Stop Watch
- __________________________________________________________*/
- struct c_clock_stop_watch
- {
- clock_t start;
- clock_t stop;
- };
- void
- c_clock_stop_watch_start(
- struct c_clock_stop_watch* const self
- ){
- self->start = clock();
- }
- double
- c_clock_stop_watch_stop(
- struct c_clock_stop_watch* const self
- ){
- self->stop = clock();
- return (((double)self->stop - (double)self->start) / CLOCKS_PER_SEC) * 1000.0;
- }
- /* Backoff Operations
- __________________________________________________________*/
- void
- thread_backoff(
- const char* msg
- ){
- Sleep(0);
- }
- /* Alignment Operations
- __________________________________________________________*/
- #if ! defined (ALIGN_UINTPTR)
- # define ALIGN_UINTPTR size_t
- #endif
- typedef ALIGN_UINTPTR align_uintptr;
- typedef char ALIGN_SASSERT
- [
- sizeof(align_uintptr) == sizeof(void*) &&
- sizeof(align_uintptr) == sizeof(char (*) (double))
- ? 1 : -1
- ];
- #define ALIGN_UP(mp_ptr, mp_align) \
- ((void*)( \
- (((align_uintptr)(mp_ptr)) + ((mp_align) - 1)) \
- & ~(((mp_align) - 1)) \
- ))
- /* IA-32 Atomic Operations
- __________________________________________________________*/
- typedef unsigned __int32 atomic_uint32_t;
- #define ATOMIC_API(mp_ret) \
- __declspec(naked) \
- mp_ret \
- __cdecl
- ATOMIC_API(atomic_uint32_t)
- ATOMIC_XADD(
- atomic_uint32_t volatile* dest,
- atomic_uint32_t value
- ){
- _asm
- {
- MOV ECX, [ESP + 4]
- MOV EAX, [ESP + 8]
- LOCK XADD [ECX], EAX
- RET
- }
- }
- ATOMIC_API(atomic_uint32_t)
- ATOMIC_XCHG(
- atomic_uint32_t volatile* dest,
- atomic_uint32_t value
- ){
- _asm
- {
- MOV ECX, [ESP + 4]
- MOV EAX, [ESP + 8]
- XCHG [ECX], EAX
- RET
- }
- }
- ATOMIC_API(atomic_uint32_t)
- ATOMIC_LOAD(
- atomic_uint32_t volatile* dest
- ){
- _asm
- {
- MOV ECX, [ESP + 4]
- MOV EAX, [ECX]
- RET
- }
- }
- ATOMIC_API(void)
- ATOMIC_STORE(
- atomic_uint32_t volatile* dest,
- atomic_uint32_t value
- ){
- _asm
- {
- MOV ECX, [ESP + 4]
- MOV EAX, [ESP + 8]
- MOV [ECX], EAX
- RET
- }
- }
- /* IA-32 Cache Line
- __________________________________________________________*/
- #define L2_CACHE 128
- #define L2_PAD_EX(mp_name, mp_type, mp_count) \
- unsigned char mp_name[ \
- L2_CACHE - (sizeof(mp_type) * (mp_count)) \
- ]
- #define L2_PAD(mp_name, mp_type) \
- L2_PAD_EX(mp_name, mp_type, 1)
- /* MPMC Bounded Queue By Chris M. Thomasson
- __________________________________________________________*/
- struct cmt_queue_cell
- {
- atomic_uint32_t ver;
- queue_state_t state;
- L2_PAD(l2pad1, struct { atomic_uint32_t a; queue_state_t b; } );
- };
- struct cmt_queue
- {
- atomic_uint32_t head;
- L2_PAD(l2pad1, atomic_uint32_t);
- atomic_uint32_t tail;
- L2_PAD(l2pad2, atomic_uint32_t);
- struct cmt_queue_cell cells[N];
- };
- void
- cmt_queue_init(
- struct cmt_queue* const self
- ){
- atomic_uint32_t i;
- self->head = 0;
- self->tail = 0;
- for (i = 0; i < N; ++i)
- {
- self->cells[i].ver = i;
- }
- }
- void
- cmt_queue_push(
- struct cmt_queue* const self,
- queue_state_t qstate
- ){
- atomic_uint32_t ver = ATOMIC_XADD(&self->head, 1);
- struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
- while (ATOMIC_LOAD(&cell->ver) != ver)
- {
- thread_backoff("cmt_queue_push");
- }
- cell->state = qstate;
- ATOMIC_STORE(&cell->ver, ver + 1);
- }
- queue_state_t
- cmt_queue_pop(
- struct cmt_queue* const self
- ){
- atomic_uint32_t ver = ATOMIC_XADD(&self->tail, 1);
- struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
- queue_state_t qstate;
- while (ATOMIC_LOAD(&cell->ver) != ver + 1)
- {
- thread_backoff("cmt_queue_pop");
- }
- qstate = cell->state;
- ATOMIC_STORE(&cell->ver, ver + N);
- return qstate;
- }
- /* MPMC Bounded Queue /w Dual Spinlock's
- __________________________________________________________*/
- struct dsl_queue_cell
- {
- queue_state_t state;
- L2_PAD_EX(l2pad2, atomic_uint32_t, 2);
- };
- struct dsl_queue
- {
- atomic_uint32_t head_lock;
- L2_PAD(l2pad1, atomic_uint32_t);
- atomic_uint32_t head;
- L2_PAD(l2pad2, atomic_uint32_t);
- atomic_uint32_t tail_lock;
- L2_PAD(l2pad3, atomic_uint32_t);
- atomic_uint32_t tail;
- L2_PAD(l2pad4, atomic_uint32_t);
- struct dsl_queue_cell cells[N];
- };
- void
- dsl_queue_init(
- struct dsl_queue* const self
- ){
- self->head_lock = 0;
- self->head = 0;
- self->tail_lock = 0;
- self->tail = 0;
- }
- int
- dsl_queue_try_push(
- struct dsl_queue* const self,
- queue_state_t qstate
- ){
- atomic_uint32_t head;
- atomic_uint32_t tail;
- /* acquire head lock */
- while (ATOMIC_XCHG(&self->head_lock, 1))
- {
- thread_backoff("dsl_queue_try_push");
- }
- head = ATOMIC_LOAD(&self->head);
- tail = ATOMIC_LOAD(&self->tail);
- if (tail == ((head + 1) & (N - 1)))
- {
- /* release head lock */
- ATOMIC_STORE(&self->head_lock, 0);
- return 0;
- }
- self->cells[head].state = qstate;
- ATOMIC_STORE(&self->head, (head + 1UL) & (N - 1UL));
- /* release head lock */
- ATOMIC_STORE(&self->head_lock, 0);
- return 1;
- }
- int
- dsl_queue_try_pop(
- struct dsl_queue* const self,
- queue_state_t* qstate
- ){
- atomic_uint32_t head;
- atomic_uint32_t tail;
- /* acquire tail lock */
- while (ATOMIC_XCHG(&self->tail_lock, 1))
- {
- thread_backoff("dsl_queue_try_pop");
- }
- head = ATOMIC_LOAD(&self->head);
- tail = ATOMIC_LOAD(&self->tail);
- if (head == tail)
- {
- /* release tail lock */
- ATOMIC_STORE(&self->tail_lock, 0);
- return 0;
- }
- *qstate = self->cells[head].state;
- ATOMIC_STORE(&self->tail, (tail + 1UL) & (N - 1UL));
- /* release tail lock */
- ATOMIC_STORE(&self->tail_lock, 0);
- return 1;
- }
- void
- dsl_queue_push(
- struct dsl_queue* const self,
- queue_state_t qstate
- ){
- while (! dsl_queue_try_push(self, qstate))
- {
- thread_backoff("dsl_queue_push");
- }
- }
- queue_state_t
- dsl_queue_pop(
- struct dsl_queue* const self
- ){
- queue_state_t qstate;
- while (! dsl_queue_try_pop(self, &qstate))
- {
- thread_backoff("dsl_queue_pop");
- }
- return qstate;
- }
- /* MPMC Bounded Queue Test Abstraction
- __________________________________________________________*/
- #if defined (CMT_QUEUE_ALGO)
- /* Chris M. Thomasson Algorithm */
- # define QUEUE_ALGO_ID "Chris M. Thomasson MPMC Queue Aglorithm"
- typedef struct cmt_queue test_queue_t;
- # define test_queue_init cmt_queue_init
- # define test_queue_push cmt_queue_push
- # define test_queue_pop cmt_queue_pop
- #elif defined (DSL_QUEUE_ALGO)
- /* Dual Spinlock Algorithm */
- # define QUEUE_ALGO_ID "Dual Spinlock MPMC Queue Aglorithm"
- typedef struct dsl_queue test_queue_t;
- # define test_queue_init dsl_queue_init
- # define test_queue_push dsl_queue_push
- # define test_queue_pop dsl_queue_pop
- #else
- # error No Test Macro Defined!
- #endif
- /* Stop Watch Abstraction
- __________________________________________________________*/
- /* C `clock()' Stop Watch */
- typedef struct c_clock_stop_watch stop_watch_t;
- #define stop_watch_start c_clock_stop_watch_start
- #define stop_watch_stop c_clock_stop_watch_stop
- /* Test Program
- __________________________________________________________*/
- static test_queue_t* g_tqueue;
- static pthread_barrier_t g_tbarrier;
- void*
- test_thread(
- void* state
- ){
- unsigned i;
- queue_state_t qstate;
- unsigned const tid = (unsigned const)state;
- pthread_barrier_wait(&g_tbarrier);
- for (i = 0; i < ITERS; ++i)
- {
- unsigned c;
- for (c = 0; c < PUSH_POPS; ++c)
- {
- test_queue_push(g_tqueue, (queue_state_t)tid);
- /*printf("test_thread(%u)::pushed(%u)\n", tid, tid);*/
- }
- for (c = 0; c < PUSH_POPS; ++c)
- {
- qstate = test_queue_pop(g_tqueue);
- /*printf("test_thread(%u)::popped(%u)\n", tid, (unsigned)qstate);*/
- }
- }
- return NULL;
- }
- /* Main Entry */
- int
- main(void)
- {
- pthread_t tid[THREADS];
- unsigned i = 0;
- stop_watch_t stop_watch;
- double elapsed_time;
- void* raw_buf = malloc(sizeof(test_queue_t) + L2_CACHE);
- puts("Testing " QUEUE_ALGO_ID "...");
- g_tqueue = ALIGN_UP(raw_buf, L2_CACHE);
- test_queue_init(g_tqueue);
- pthread_barrier_init(&g_tbarrier, NULL, THREADS);
- stop_watch_start(&stop_watch);
- for (i = 0; i < THREADS; ++i)
- {
- pthread_create(tid + i, NULL, test_thread, (void*)i);
- }
- for (i = 0; i < THREADS; ++i)
- {
- pthread_join(tid[i], NULL);
- }
- elapsed_time = stop_watch_stop(&stop_watch);
- printf("\nelapsed time: %lf\n", elapsed_time);
- pthread_barrier_destroy(&g_tbarrier);
- puts("\n\n\n___________________________________________\n"
- "The program has completed; hit <ENTER> to exit...");
- getchar();
- free(raw_buf);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement