Advertisement
Guest User

Work Stealing Queue

a guest
Mar 14th, 2022
170
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 4.38 KB | None | 0 0
  1. #ifndef WORK_QUEUE_H
  2. #define WORK_QUEUE_H
  3.  
  4. typedef struct atomic32 {
  5.     int counter;
  6. } atomic32_t;
  7.  
  8. #define __FETCH_ONCE(type, var) \
  9.     *(volatile type *)(var)
  10.  
  11. #define __WRITE_ONCE(type, var, val) \
  12.     *(volatile type *)(var) = val
  13.    
  14. #define FETCH_ONCE_S32(var) \
  15.     __FETCH_ONCE(int, var)
  16.    
  17. #define WRITE_ONCE_S32(var, val) \
  18.     __WRITE_ONCE(int, var, val)
  19.  
  20. #define atomic_put(var, val) \
  21.     *(volatile int *)(&(var)->counter) = val
  22.    
  23. #define atomic_get(var) \
  24.     *(volatile int *)(&(var)->counter)
  25.  
  26. #define write_barrier() \
  27.     __asm__ ("\tsfence\n")
  28.  
  29. #define read_barrier() \
  30.     __asm__ ("\tlfence\n")
  31.  
  32. #define full_barrier() \
  33.     __asm__ ("\tmfence\n")
  34.  
  35. #define __FETCH_ONCE(type, var) \
  36.     *(volatile type *)(var)
  37.  
  38. #define __WRITE_ONCE(type, var, val) \
  39.     *(volatile type *)(var) = val
  40.  
  41. #define FETCH_ONCE_S32(var) \
  42.     __FETCH_ONCE(S32, var)
  43.  
  44. #define WRITE_ONCE_S32(var, val) \
  45.     __WRITE_ONCE(S32, var, val)
  46.  
  47. #define atomic_put(var, val) \
  48.     *(volatile int *)(&(var)->counter) = val
  49.  
  50. #define atomic_get(var) \
  51.     *(volatile int *)(&(var)->counter)
  52.  
  53. static inline int atomic_add (atomic32_t *val, int add)
  54. {
  55.     int old;
  56.  
  57.     do {
  58.         old = FETCH_ONCE_S32 (&val->counter);
  59.     } while (__sync_val_compare_and_swap (&val->counter, old, old + add) != old);
  60.  
  61.     return old;
  62. }
  63.  
  64. static inline int atomic_sub (atomic32_t *val, int sub)
  65. {
  66.     int old;
  67.  
  68.     do {
  69.         old = FETCH_ONCE_S32 (&val->counter);
  70.     } while (__sync_val_compare_and_swap (&val->counter, old, old - sub) != old);
  71.  
  72.     return old;
  73. }
  74.  
  75. #include <stddef.h>
  76.  
  77. #define WORK_QUEUE_SIZE 4096
  78. #define WORK_QUEUE_MASK (WORK_QUEUE_SIZE - 1)
  79.  
  80. struct work_entry {
  81.     void (*func) (void *user, void *data, void *sync);
  82.     //void (*func) (/*void *fiber, void *sched, */void *user, void *data, void *sync);
  83.     void *user;
  84.     void *data;
  85.     void *sync;
  86. };
  87.  
  88. struct work_queue {
  89.     atomic32_t top;
  90.     atomic32_t btm;
  91.     struct work_entry *entries;
  92. } __aligned(64);
  93.  
  94. int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry);
  95. int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry);
  96. int work_queue_steal (struct work_queue *queue, struct work_entry *entry);
  97.  
  98. int work_queue_enqueue (struct work_queue *queue, struct work_entry *entry)
  99. {
  100.     int btm = atomic_get (&queue->btm);
  101.  
  102.     queue->entries[btm & WORK_QUEUE_MASK] = *entry;
  103.     __asm__ volatile ("\tsfence\n" ::: "memory");
  104.  
  105.     atomic_put (&queue->btm, btm + 1); // steal needs to see job
  106.  
  107.     return 0;
  108. }
  109.  
  110. int work_queue_dequeue (struct work_queue *queue, struct work_entry *entry)
  111. {
  112.     int btm;
  113.     do {
  114.       btm = atomic_get (&queue->btm);
  115.     } while (__sync_val_compare_and_swap (&queue->btm.counter, btm, btm - 1) != btm);
  116.  
  117.     int status = 0;
  118.     int top = atomic_get (&queue->top);
  119.  
  120.     if (btm >= top) {
  121.         *entry = queue->entries[btm & WORK_QUEUE_MASK];
  122.         __asm__ volatile ("\tlfence\n" ::: "memory");
  123.  
  124.         if (top != btm) {
  125.             return 0;
  126.         }
  127.  
  128.         int tmp = top + 1;
  129.         if (__sync_val_compare_and_swap (&queue->top.counter, top, tmp)) {
  130.             entry->func = NULL;
  131.             entry->data = NULL;
  132.             entry->user = NULL;
  133.             entry->sync = NULL;
  134.             status = 1;
  135.         }
  136.  
  137.         atomic_put (&queue->btm, tmp);
  138.         return status;
  139.     }
  140.  
  141.     entry->func = NULL;
  142.     entry->data = NULL;
  143.     entry->user = NULL;
  144.     entry->sync = NULL;
  145.  
  146.     return 1;
  147. }
  148.  
  149. int work_queue_steal (struct work_queue *queue, struct work_entry *entry)
  150. {    int t = atomic_get (&queue->top);
  151.     __asm__ volatile ("\tmfence\n" ::: "memory");
  152.     int b = atomic_get (&queue->btm);
  153.  
  154.     int status = 1;
  155.     if (t < b) {
  156.         status = 0;
  157.  
  158.         entry->func = queue->entries[t & WORK_QUEUE_MASK].func;
  159.         entry->data = queue->entries[t & WORK_QUEUE_MASK].data;
  160.         entry->user = queue->entries[t & WORK_QUEUE_MASK].user;
  161.         entry->sync = queue->entries[t & WORK_QUEUE_MASK].sync;
  162.         __asm__ ("\tlfence\n" ::: "memory");
  163.  
  164.         if (__sync_val_compare_and_swap (&queue->top.counter, t, t + 1) == t) {
  165.             return 0;
  166.         }
  167.  
  168.         return 1;
  169.     }
  170.  
  171.     if (status == 1) {
  172.         entry->func = NULL;
  173.         entry->data = NULL;
  174.         entry->user = NULL;
  175.         entry->sync = NULL;
  176.     }
  177.     return status;
  178. }
  179.  
  180. #endif /* EOF */
  181.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement