Advertisement
Guest User

Untitled

a guest
Oct 19th, 2019
210
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.01 KB | None | 0 0
  1. pub const LocalQueue = struct {
  2. /// Single-Producer, Multi-Consumer, Lock-Free Stealable Ring Buffer.
  3. /// Pushes to the tail, pops from the head, steals from the head to the tail.
  4. /// Pushes can be non-atomic since all functions should only be called by the single producer.
  5. tasks: [256]*Task,
  6. head: utils.Atomic(usize),
  7. tail: utils.Atomic(usize),
  8. is_sequential: bool,
  9.  
  10. pub fn init(self: *@This(), is_sequential: bool) void {
  11. self.head.set(0);
  12. self.tail.set(0);
  13. self.is_sequential = is_sequential;
  14. }
  15.  
  16. pub fn push(self: *@This(), list: *Task, size: usize) void {
  17. if (self.is_sequential)
  18. return self.pushSequential(list, size);
  19. return self.pushParallel(list, size);
  20. }
  21.  
  22. fn pushSequential(self: *@This(), list: *Task, size: usize) void {
  23. // Same as pushParallel but without atomic instructions
  24. const head = self.head.get();
  25. const tail = self.tail.get();
  26. const pushed = self.pushList(head, tail, list, size);
  27. self.tail.set(tail + pushed);
  28. }
  29.  
  30. fn pushParallel(self: *@This(), list: *Task, size: usize) void {
  31. // Load the head using `Relaxed` since the queue is expected to be empty.
  32. // Still use atomics for the head in case the queue is errornously not empty.
  33. // The tail, on the other hand, isn't written to concurrently so can use non-atomics.
  34. // Store tail using release to synchronize the new tasks with the stealers.
  35. const head = self.head.load(.Relaxed);
  36. const tail = self.tail.get();
  37. const pushed = self.pushList(head, tail, list, size);
  38. self.tail.store(tail + pushed, .Release);
  39. }
  40.  
  41. fn pushList(self: *@This(), head: usize, tail: usize, list: *Task, size: usize) void {
  42. // ensure that the queue is empty and that theres tasks to push
  43. assert(size > 0);
  44. assert(head == tail);
  45. assert(size <= self.tasks.len);
  46.  
  47. // push all tasks but exit early on bad link
  48. var task = list;
  49. var pushed: usize = 0;
  50. while (pushed < size) {
  51. self.tasks[(tail + pushed) % self.tasks.len] = task;
  52. pushed += 1;
  53. task = task.next orelse break;
  54. }
  55.  
  56. // only return the amount of tasks actually written
  57. return pushed;
  58. }
  59.  
  60. pub fn pop(self: *@This()) ?*Task {
  61. if (self.is_sequential)
  62. return self.popSequential();
  63. return self.popParallel();
  64. }
  65.  
  66. pub fn popSequential(self: *@This()) ?*Task {
  67. const head = self.head.get();
  68. if (head == self.tail.get())
  69. return null;
  70. self.head.set(head + 1);
  71. return self.tasks[head % self.tasks.len];
  72. }
  73.  
  74. pub fn popParallel(self: *@This()) ?*Task {
  75. // Load the head using `Acquire` to synchronize with any `Release`s from stealers.
  76. // The tail, on the other hand, isn't written to concurrently so can use non-atomics.
  77. const tail = self.tail.get();
  78. var head = self.head.load(.Acquire);
  79.  
  80. // Update the head in a lock-free fashion.
  81. // Cas-store using `Release` to commit read from stealers.
  82. while (head != tail) {
  83. const task = self.tasks[head % self.tasks.len];
  84. if (self.head.compareSwap(head, head + 1, .Release, .Relaxed)) |updated_head| {
  85. head = updated_head;
  86. } else {
  87. return task;
  88. }
  89. }
  90.  
  91. // queue is empty
  92. return null;
  93. }
  94.  
  95. pub fn steal(self: *@This(), victim: *@This()) usize {
  96. // Self's fields are expected to not be modified concurrently here.
  97. // Should only attempt to steal if the current queue is empty.
  98. const tail = self.tail.get();
  99. const head = self.head.load(.Relaxed);
  100. std.debug.assert(head == tail);
  101.  
  102. var victim_tail = victim.tail.load(.Acquire); // synchronize with victim's push()
  103. var victim_head = victim.head.load(.Acquire); // synchronize with victim's pop() & other stealers
  104. while (victim_tail - victim_head > 0) { // only steal from non-empty victim queues
  105.  
  106. // try and steal half of the victims tasks
  107. var steal_size = (victim_tail - victim_head);
  108. steal_size -= steal_size / 2;
  109. var index: usize = 0;
  110. while (index < steal_size) : (index += 1)
  111. self.tasks[(tail + index) % self.tasks.len] = victim.tasks[(victim_head + index) % victim.tasks.len];
  112.  
  113. // try and commit (`Release`) the changes or retry the steal if victim.head was modified
  114. if (victim.head.compareSwap(victim_head, victim_head + steal_size, .Release, .Relaxed)) |updated_victim_head| {
  115. victim_head = updated_victim_head;
  116. victim_tail = victim_tail.load(.Acquire);
  117. } else {
  118. self.tail.store(tail + steal_size, .Release);
  119. return steal_size;
  120. }
  121. }
  122.  
  123. // victim's queue was empty, not able to steal anything
  124. return 0;
  125. }
  126. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement