Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef WORK_QUEUE_H
- #define WORK_QUEUE_H
- typedef struct atomic32 {
- int counter;
- } atomic32_t;
- #define __FETCH_ONCE(type, var) \
- *(volatile type *)(var)
- #define __WRITE_ONCE(type, var, val) \
- *(volatile type *)(var) = val
- #define FETCH_ONCE_S32(var) \
- __FETCH_ONCE(int, var)
- #define WRITE_ONCE_S32(var, val) \
- __WRITE_ONCE(int, var, val)
- #define atomic_put(var, val) \
- *(volatile int *)(&(var)->counter) = val
- #define atomic_get(var) \
- *(volatile int *)(&(var)->counter)
- #define write_barrier() \
- __asm__ ("\tsfence\n")
- #define read_barrier() \
- __asm__ ("\tlfence\n")
- #define full_barrier() \
- __asm__ ("\tmfence\n")
- #define __FETCH_ONCE(type, var) \
- *(volatile type *)(var)
- #define __WRITE_ONCE(type, var, val) \
- *(volatile type *)(var) = val
- #define FETCH_ONCE_S32(var) \
- __FETCH_ONCE(S32, var)
- #define WRITE_ONCE_S32(var, val) \
- __WRITE_ONCE(S32, var, val)
- #define atomic_put(var, val) \
- *(volatile int *)(&(var)->counter) = val
- #define atomic_get(var) \
- *(volatile int *)(&(var)->counter)
- static inline int atomic_add (atomic32_t *val, int add)
- {
- int old;
- do {
- old = FETCH_ONCE_S32 (&val->counter);
- } while (__sync_val_compare_and_swap (&val->counter, old, old + add) != old);
- return old;
- }
- static inline int atomic_sub (atomic32_t *val, int sub)
- {
- int old;
- do {
- old = FETCH_ONCE_S32 (&val->counter);
- } while (__sync_val_compare_and_swap (&val->counter, old, old - sub) != old);
- return old;
- }
- #include <stddef.h>
- #define WORK_QUEUE_SIZE 4096
- #define WORK_QUEUE_MASK (WORK_QUEUE_SIZE - 1)
- struct work_entry {
- void (*func) (void *user, void *data, void *sync);
- //void (*func) (/*void *fiber, void *sched, */void *user, void *data, void *sync);
- void *user;
- void *data;
- void *sync;
- };
- struct work_queue {
- atomic32_t top;
- atomic32_t btm;
- struct work_entry *entries;
- } __aligned(64);
- int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry);
- int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry);
- int work_queue_steal (struct work_queue *queue, struct work_entry *entry);
- int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry)
- {
- int btm = atomic_get (&queue->btm);
- queue->entries[btm & WORK_QUEUE_MASK] = *entry;
- __asm__ volatile ("\tsfence\n" ::: "memory");
- atomic_put (&queue->btm, btm + 1); // steal needs to see job
- return 0;
- }
- int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry)
- {
- int btm;
- do {
- btm = atomic_get (&queue->btm);
- } while (__sync_val_compare_and_swap (&queue->btm.counter, btm, btm - 1) != btm);
- int status = 0;
- int top = atomic_get (&queue->top);
- if (btm >= top) {
- *entry = queue->entries[btm & WORK_QUEUE_MASK];
- __asm__ volatile ("\tlfence\n" ::: "memory");
- if (top != btm) {
- return 0;
- }
- int tmp = top + 1;
- if (__sync_val_compare_and_swap (&queue->top.counter, top, tmp)) {
- entry->func = NULL;
- entry->data = NULL;
- entry->user = NULL;
- entry->sync = NULL;
- status = 1;
- }
- atomic_put (&queue->btm, tmp);
- return status;
- }
- entry->func = NULL;
- entry->data = NULL;
- entry->user = NULL;
- entry->sync = NULL;
- return 1;
- }
- int work_queue_steal (struct work_queue *queue, struct work_entry *entry)
- { int t = atomic_get (&queue->top);
- __asm__ volatile ("\tmfence\n" ::: "memory");
- int b = atomic_get (&queue->btm);
- int status = 1;
- if (t < b) {
- status = 0;
- entry->func = queue->entries[t & WORK_QUEUE_MASK].func;
- entry->data = queue->entries[t & WORK_QUEUE_MASK].data;
- entry->user = queue->entries[t & WORK_QUEUE_MASK].user;
- entry->sync = queue->entries[t & WORK_QUEUE_MASK].sync;
- __asm__ ("\tlfence\n" ::: "memory");
- if (__sync_val_compare_and_swap (&queue->top.counter, t, t + 1) == t) {
- return 0;
- }
- return 1;
- }
- if (status == 1) {
- entry->func = NULL;
- entry->data = NULL;
- entry->user = NULL;
- entry->sync = NULL;
- }
- return status;
- }
- #endif /* EOF */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement