Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- const std = @import("std");
- const builtin = @import("builtin");
- const testing = std.testing;
- const assert = std.debug.assert;
- const os = std.os;
- const system = os.system;
- const SpinLock = std.SpinLock;
- const ResetEvent = struct {
- key: i32,
- os_event: OsEvent,
- fn init() ResetEvent {
- return ResetEvent{
- .key = 0,
- .os_event = OsEvent.init(),
- };
- }
- fn deinit(self: *ResetEvent) void {
- self.os_event.deinit();
- self.* = undefined;
- }
- fn reset(self: *ResetEvent) void {
- @atomicStore(i32, &self.key, 0, .Release);
- }
- fn set(self: *ResetEvent) void {
- if (@atomicRmw(i32, &self.key, .Xchg, 2, .Release) == 1)
- self.os_event.wake(&self.key);
- }
- fn wait(self: *ResetEvent) void {
- var state = @atomicLoad(i32, &self.key, .Monotonic);
- while (state == 0) {
- state = @cmpxchgWeak(i32, &self.key, 0, 1, .Acquire, .Monotonic) orelse {
- return self.os_event.wait(&self.key);
- };
- }
- }
- const OsEvent = switch (builtin.os) {
- .windows => WindowsEvent,
- .linux => if (builtin.link_libc) PosixEvent else LinuxEvent,
- else => if (builtin.link_libc) PosixEvent else SpinEvent,
- };
- const SpinEvent = struct {
- const SpinLock = std.SpinLock;
- fn init() SpinEvent { return SpinEvent{}; }
- fn deinit(self: *SpinEvent) void {}
- fn wake(self: *SpinEvent, ptr: *i32) void {}
- fn wait(self: *SpinEvent, ptr: *i32) void {
- var spin = SpinLock.Backoff.init();
- while (@atomicLoad(i32, ptr, .Acquire) == 1)
- spin.yield();
- }
- };
- const LinuxEvent = struct {
- fn init() LinuxEvent { return LinuxEvent{}; }
- fn deinit(self: *LinuxEvent) void {}
- fn wake(self: *LinuxEvent, ptr: *i32) void {
- const rc = system.futex_wake(ptr, system.FUTEX_WAKE | system.FUTEX_PRIVATE_FLAG, 1);
- assert(os.errno(rc) == 0);
- }
- fn wait(self: *LinuxEvent, ptr: *i32) void {
- while (@atomicLoad(i32, ptr, .Acquire) == 1) {
- const rc = system.futex_wait(ptr, system.FUTEX_WAIT | system.FUTEX_PRIVATE_FLAG, 1, null);
- switch (os.errno(rc)) {
- 0 => return,
- os.EAGAIN => return,
- os.EINTR => continue,
- else => unreachable,
- }
- }
- }
- };
- const WindowsEvent = struct {
- const windows = std.os.windows;
- fn init() WindowsEvent { return WindowsEvent{}; }
- fn deinit(self: *WindowsEvent) void {}
- fn wake(self: *WindowsEvent, ptr: *i32) void {
- const handle = getEventHandle() orelse return @ptrCast(*SpinEvent, self).wake(ptr);
- const key = @ptrCast(*const c_void, ptr);
- const rc = system.ntdll.NtReleaseKeyedEvent(handle, key, system.FALSE, null);
- assert(rc == 0);
- }
- fn wait(self: *WindowsEvent, ptr: *i32) void {
- const handle = getEventHandle() orelse return @ptrCast(*SpinEvent, self).wait(ptr);
- const key = @ptrCast(*const c_void, ptr);
- const rc = system.ntdll.NtWaitForKeyedEvent(handle, key, system.FALSE, null);
- assert(rc == 0);
- }
- var event_handle = std.lazyInit(?system.HANDLE);
- fn getEventHandle() ?system.HANDLE {
- if (event_handle.get()) |handle|
- return handle.*;
- const handle_ptr = @ptrCast(*system.HANDLE, &event_handle.data);
- const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
- if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != 0)
- event_handle.data = null;
- event_handle.resolve();
- return event_handle.data;
- }
- };
- const PosixEvent = struct {
- cond: system.pthread_cond_t,
- mutex: system.pthread_mutex_t,
- fn init() PosixEvent {
- return PosixEvent{
- .cond = system.PTHREAD_COND_INITIALIZER,
- .mutex = system.PTHREAD_MUTEX_INITIALIZER,
- };
- }
- fn deinit(self: *PosixEvent) void {
- const valid_error = if (builtin.os == .dragonfly) os.EINVAL else 0;
- const retm = system.pthread_mutex_destroy(&self.mutex);
- assert(retm == 0 or retm == valid_error);
- const retc = system.pthread_cond_destroy(&self.cond);
- assert(retc == 0 or retc == valid_error);
- }
- fn wake(self: *PosixEvent, ptr: *i32) void {
- assert(system.pthread_mutex_lock(&self.mutex));
- defer assert(system.pthread_mutex_unlock(&self.mutex));
- system.pthread_cond_signal(&self.cond);
- }
- fn wait(self: *PosixEvent, ptr: *i32) void {
- assert(system.pthread_mutex_lock(&self.mutex));
- defer assert(system.pthread_mutex_unlock(&self.mutex));
- while (@atomicLoad(i32, ptr, .Acquire) == 1)
- system.pthread_cond_wait(&self.cond, &self.mutex);
- }
- };
- };
- const Mutex = struct {
- state: usize,
- const MUTEX_LOCK: usize = 1;
- const QUEUE_LOCK: usize = 2;
- const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
- const SPIN_CPU = 4;
- const SPIN_CPU_COUNT = 30;
- const SPIN_THREAD = 1;
- const QueueNode = struct {
- next: ?*QueueNode,
- event: ResetEvent,
- };
- pub fn init() Mutex {
- return Mutex{ .state = 0 };
- }
- pub fn deinit(self: *Mutex) void {
- self.* = undefined;
- }
- pub fn acquire(self: *Mutex) Held {
- if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |state|
- self.acquireSlow(state);
- return Held{ .mutex = self };
- }
- fn acquireSlow(self: *Mutex, current_state: usize) void {
- var state = current_state;
- var spin_count: usize = 0;
- while (true) {
- if (state & MUTEX_LOCK == 0) {
- state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
- continue;
- }
- if (state & QUEUE_MASK == 0 and spin_count < SPIN_CPU + SPIN_THREAD) {
- if (spin_count < SPIN_CPU) {
- SpinLock.yield(SPIN_CPU_COUNT);
- } else {
- os.sched_yield() catch std.time.sleep(1 * std.time.millisecond);
- }
- spin_count += 1;
- state = @atomicLoad(usize, &self.state, .Monotonic);
- continue;
- }
- var node = QueueNode{
- .event = ResetEvent.init(),
- .next = @intToPtr(?*QueueNode, state & QUEUE_MASK),
- };
- defer node.event.deinit();
- const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
- state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
- node.event.wait();
- spin_count = 0;
- state = @atomicLoad(usize, &self.state, .Monotonic);
- continue;
- };
- }
- }
- pub const Held = struct {
- mutex: *Mutex,
- pub fn release(self: Held) void {
- const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
- if (state & QUEUE_LOCK == 0 and state & QUEUE_MASK != 0)
- self.mutex.releaseSlow(state);
- }
- };
- fn releaseSlow(self: *Mutex, current_state: usize) void {
- var state = current_state;
- while (true) {
- if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0)
- return;
- state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
- }
- while (true) {
- if (state & MUTEX_LOCK != 0) {
- state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return;
- continue;
- }
- const node = @intToPtr(*QueueNode, state & QUEUE_MASK);
- state = @cmpxchgWeak(usize, &self.state, state, @ptrToInt(node.next), .Release, .Monotonic)
- orelse return node.event.set();
- }
- }
- };
- const TestContext = struct {
- mutex: *Mutex,
- data: i128,
- const incr_count = 10000;
- };
- test "std.Mutex" {
- std.debug.warn("\n", .{});
- var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
- defer std.heap.page_allocator.free(plenty_of_memory);
- var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
- var a = &fixed_buffer_allocator.allocator;
- var mutex = Mutex.init();
- defer mutex.deinit();
- var context = TestContext{
- .mutex = &mutex,
- .data = 0,
- };
- if (builtin.single_threaded) {
- worker(&context);
- testing.expect(context.data == TestContext.incr_count);
- } else {
- var threads: [thread_count]*std.Thread = undefined;
- for (threads) |*t| {
- t.* = try std.Thread.spawn(&context, worker);
- }
- for (threads) |t|
- t.wait();
- testing.expect(context.data == thread_count * TestContext.incr_count);
- }
- }
- const thread_count = 10;
- var x: usize = 0;
- fn worker(ctx: *TestContext) void {
- var i: usize = 0;
- while (i != TestContext.incr_count) : (i += 1) {
- const held = ctx.mutex.acquire();
- defer held.release();
- ctx.data += 1;
- }
- std.debug.warn("{} finished {}/{}\n", .{std.Thread.getCurrentId(), @atomicRmw(usize, &x, .Add, 1, .SeqCst), thread_count-1});
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement