Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #define WORK_QUEUE_SIZE 4096
- #define WORK_QUEUE_MASK (WORK_QUEUE_SIZE - 1)
- struct work_entry {
- void (*func) (void *user, void *data, void *sync);
- //void (*func) (/*void *fiber, void *sched, */void *user, void *data, void *sync);
- void *user;
- void *data;
- void *sync;
- };
- struct work_queue {
- atomic32_t top;
- atomic32_t btm;
- struct work_entry *entries;
- } __aligned(64);
- int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry);
- int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry);
- int work_queue_steal (struct work_queue *queue, struct work_entry *entry);
- #if 0
- size_t b = load_explicit(&q->bottom, relaxed);
- size_t t = load_explicit(&q->top, acquire);
- Array *a = load_explicit(&q->array, relaxed);
- if (b - t > a->size - 1) { /* Full queue. */
- resize(q);
- a = load_explicit(&q->array, relaxed);
- }
- store_explicit(&a->buffer[b % a->size], x, relaxed);
- thread_fence(release);
- store_explicit(&q->bottom, b + 1, relaxed);
- #endif
- int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry)
- {
- int btm = __atomic_load_n (&queue->btm.counter, __ATOMIC_RELAXED);
- int top = __atomic_load_n (&queue->top.counter, __ATOMIC_ACQUIRE);
- struct work_entry *array = queue->entries;
- if (btm - top > (WORK_QUEUE_SIZE - 1)) {
- return 1;
- }
- __atomic_store_n (&array[btm & WORK_QUEUE_MASK].func, entry->func, __ATOMIC_RELAXED);
- __atomic_store_n (&array[btm & WORK_QUEUE_MASK].data, entry->data, __ATOMIC_RELAXED);
- __atomic_store_n (&array[btm & WORK_QUEUE_MASK].user, entry->user, __ATOMIC_RELAXED);
- __atomic_store_n (&array[btm & WORK_QUEUE_MASK].sync, entry->sync, __ATOMIC_RELAXED);
- __atomic_thread_fence (__ATOMIC_RELEASE);
- __atomic_store_n (&queue->btm.counter, btm + 1, __ATOMIC_RELAXED);
- return 0;
- }
- #if 0
- size_t b = load_explicit(&q->bottom, relaxed) - 1;
- Array *a = load_explicit(&q->array, relaxed);
- store_explicit(&q->bottom, b, relaxed);
- thread_fence(seq_cst);
- size_t t = load_explicit(&q->top, relaxed);
- int x;
- if (t <= b) {
- /* Non-empty queue. */
- x = load_explicit(&a->buffer[b % a->size], relaxed);
- if (t == b) {
- /* Single last element in queue. */
- if (!compare_exchange_strong_explicit(&q->top, &t, t + 1, seq_cst, relaxed))
- /* Failed race. */
- x = EMPTY;
- store_explicit(&q->bottom, b + 1, relaxed);
- }
- } else { /* Empty queue. */
- x = EMPTY;
- store_explicit(&q->bottom, b + 1, relaxed);
- }
- #endif
- int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry)
- {
- int btm = __atomic_load_n (&queue->btm.counter, __ATOMIC_RELAXED) - 1;
- __atomic_store_n (&queue->btm.counter, btm, __ATOMIC_RELEASE); // update for steal()
- __atomic_thread_fence (__ATOMIC_SEQ_CST);
- int top = __atomic_load_n (&queue->top.counter, __ATOMIC_RELAXED);
- entry->func = NULL;
- entry->data = NULL;
- entry->user = NULL;
- entry->sync = NULL;
- int res = 0;
- if (btm <= top) {
- *entry = queue->entries[btm & WORK_QUEUE_MASK];
- __atomic_thread_fence (__ATOMIC_ACQUIRE);
- if (top == btm) {
- if (!__atomic_compare_exchange_n (&queue->top.counter, &top, top + 1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
- res = 1;
- }
- __atomic_store_n (&queue->btm.counter, btm + 1, __ATOMIC_RELAXED);
- return res;
- }
- } else {
- __atomic_store_n (&queue->btm.counter, btm + 1, __ATOMIC_RELAXED);
- return 1;
- }
- }
- #if 0
- int steal(Deque *q) {
- size_t t = load_explicit(&q->top, acquire);
- thread_fence(seq_cst);
- size_t b = load_explicit(&q->bottom, acquire);
- int x = EMPTY;
- if (t < b) {
- /* Non-empty queue. */
- Array *a = load_explicit(&q->array, consume);
- x = load_explicit(&a->buffer[t % a->size], relaxed);
- if (!compare_exchange_strong_explicit(&q->top, &t, t + 1, seq_cst, relaxed))
- /* Failed race. */
- return ABORT;
- }
- }
- #endif
- int work_queue_steal (struct work_queue *queue, struct work_entry *entry)
- {
- int top = __atomic_load_n (&queue->top, __ATOMIC_ACQUIRE);
- __atomic_thread_fence (__ATOMIC_SEQ_CST);
- int btm = __atomic_load_n (&queue->btm, __ATOMIC_ACQUIRE);
- if (top < btm) {
- entry->func = queue->entries[top & WORK_QUEUE_MASK].func;
- entry->data = queue->entries[top & WORK_QUEUE_MASK].data;
- entry->user = queue->entries[top & WORK_QUEUE_MASK].user;
- entry->sync = queue->entries[top & WORK_QUEUE_MASK].sync;
- // need fence?
- if (!__atomic_compare_exchange_n (&queue->top, top, top + 1, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
- entry->func = NULL;
- entry->data = NULL;
- entry->user = NULL;
- entry->sync = NULL;
- return 1;
- }
- return 0;
- }
- return 1;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement