Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- pub const LocalQueue = struct {
- /// Single-Producer, Multi-Consumer, Lock-Free Stealable Ring Buffer.
- /// Pushes to the tail, pops from the head, steals from the head to the tail.
- /// Pushes can be non-atomic since all functions should only be called by the single producer.
- tasks: [256]*Task,
- head: utils.Atomic(usize),
- tail: utils.Atomic(usize),
- is_sequential: bool,
- pub fn init(self: *@This(), is_sequential: bool) void {
- self.head.set(0);
- self.tail.set(0);
- self.is_sequential = is_sequential;
- }
- pub fn push(self: *@This(), list: *Task, size: usize) void {
- if (self.is_sequential)
- return self.pushSequential(list, size);
- return self.pushParallel(list, size);
- }
- fn pushSequential(self: *@This(), list: *Task, size: usize) void {
- // Same as pushParallel but without atomic instructions
- const head = self.head.get();
- const tail = self.tail.get();
- const pushed = self.pushList(head, tail, list, size);
- self.tail.set(tail + pushed);
- }
- fn pushParallel(self: *@This(), list: *Task, size: usize) void {
- // Load the head using `Relaxed` since the queue is expected to be empty.
- // Still use atomics for the head in case the queue is errornously not empty.
- // The tail, on the other hand, isn't written to concurrently so can use non-atomics.
- // Store tail using release to synchronize the new tasks with the stealers.
- const head = self.head.load(.Relaxed);
- const tail = self.tail.get();
- const pushed = self.pushList(head, tail, list, size);
- self.tail.store(tail + pushed, .Release);
- }
- fn pushList(self: *@This(), head: usize, tail: usize, list: *Task, size: usize) void {
- // ensure that the queue is empty and that theres tasks to push
- assert(size > 0);
- assert(head == tail);
- assert(size <= self.tasks.len);
- // push all tasks but exit early on bad link
- var task = list;
- var pushed: usize = 0;
- while (pushed < size) {
- self.tasks[(tail + pushed) % self.tasks.len] = task;
- pushed += 1;
- task = task.next orelse break;
- }
- // only return the amount of tasks actually written
- return pushed;
- }
- pub fn pop(self: *@This()) ?*Task {
- if (self.is_sequential)
- return self.popSequential();
- return self.popParallel();
- }
- pub fn popSequential(self: *@This()) ?*Task {
- const head = self.head.get();
- if (head == self.tail.get())
- return null;
- self.head.set(head + 1);
- return self.tasks[head % self.tasks.len];
- }
- pub fn popParallel(self: *@This()) ?*Task {
- // Load the head using `Acquire` to synchronize with any `Release`s from stealers.
- // The tail, on the other hand, isn't written to concurrently so can use non-atomics.
- const tail = self.tail.get();
- var head = self.head.load(.Acquire);
- // Update the head in a lock-free fashion.
- // Cas-store using `Release` to commit read from stealers.
- while (head != tail) {
- const task = self.tasks[head % self.tasks.len];
- if (self.head.compareSwap(head, head + 1, .Release, .Relaxed)) |updated_head| {
- head = updated_head;
- } else {
- return task;
- }
- }
- // queue is empty
- return null;
- }
- pub fn steal(self: *@This(), victim: *@This()) usize {
- // Self's fields are expected to not be modified concurrently here.
- // Should only attempt to steal if the current queue is empty.
- const tail = self.tail.get();
- const head = self.head.load(.Relaxed);
- std.debug.assert(head == tail);
- var victim_tail = victim.tail.load(.Acquire); // synchronize with victim's push()
- var victim_head = victim.head.load(.Acquire); // synchronize with victim's pop() & other stealers
- while (victim_tail - victim_head > 0) { // only steal from non-empty victim queues
- // try and steal half of the victims tasks
- var steal_size = (victim_tail - victim_head);
- steal_size -= steal_size / 2;
- var index: usize = 0;
- while (index < steal_size) : (index += 1)
- self.tasks[(tail + index) % self.tasks.len] = victim.tasks[(victim_head + index) % victim.tasks.len];
- // try and commit (`Release`) the changes or retry the steal if victim.head was modified
- if (victim.head.compareSwap(victim_head, victim_head + steal_size, .Release, .Relaxed)) |updated_victim_head| {
- victim_head = updated_victim_head;
- victim_tail = victim_tail.load(.Acquire);
- } else {
- self.tail.store(tail + steal_size, .Release);
- return steal_size;
- }
- }
- // victim's queue was empty, not able to steal anything
- return 0;
- }
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement