Advertisement
Guest User

Untitled

a guest
Dec 10th, 2019
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 9.84 KB | None | 0 0
  1. const std = @import("std");
  2. const builtin = @import("builtin");
  3. const testing = std.testing;
  4. const assert = std.debug.assert;
  5. const os = std.os;
  6. const system = os.system;
  7. const SpinLock = std.SpinLock;
  8.  
  9. const ResetEvent = struct {
  10.     key: i32,
  11.     os_event: OsEvent,
  12.  
  13.     fn init() ResetEvent {
  14.         return ResetEvent{
  15.             .key = 0,
  16.             .os_event = OsEvent.init(),
  17.         };
  18.     }
  19.  
  20.     fn deinit(self: *ResetEvent) void {
  21.         self.os_event.deinit();
  22.         self.* = undefined;
  23.     }
  24.  
  25.     fn reset(self: *ResetEvent) void {
  26.         @atomicStore(i32, &self.key, 0, .Release);
  27.     }
  28.  
  29.     fn set(self: *ResetEvent) void {
  30.         if (@atomicRmw(i32, &self.key, .Xchg, 2, .Release) == 1)
  31.             self.os_event.wake(&self.key);
  32.     }
  33.  
  34.     fn wait(self: *ResetEvent) void {
  35.         var state = @atomicLoad(i32, &self.key, .Monotonic);
  36.         while (state == 0) {
  37.             state = @cmpxchgWeak(i32, &self.key, 0, 1, .Acquire, .Monotonic) orelse {
  38.                 return self.os_event.wait(&self.key);
  39.             };
  40.         }
  41.     }
  42.  
  43.     const OsEvent = switch (builtin.os) {
  44.         .windows => WindowsEvent,
  45.         .linux => if (builtin.link_libc) PosixEvent else LinuxEvent,
  46.         else => if (builtin.link_libc) PosixEvent else  SpinEvent,
  47.     };
  48.  
  49.     const SpinEvent = struct {
  50.         const SpinLock = std.SpinLock;
  51.  
  52.         fn init() SpinEvent { return SpinEvent{}; }
  53.         fn deinit(self: *SpinEvent) void {}
  54.  
  55.         fn wake(self: *SpinEvent, ptr: *i32) void {}
  56.         fn wait(self: *SpinEvent, ptr: *i32) void {
  57.             var spin = SpinLock.Backoff.init();
  58.             while (@atomicLoad(i32, ptr, .Acquire) == 1)
  59.                 spin.yield();
  60.         }
  61.     };
  62.  
  63.     const LinuxEvent = struct {
  64.         fn init() LinuxEvent { return LinuxEvent{}; }
  65.         fn deinit(self: *LinuxEvent) void {}
  66.  
  67.         fn wake(self: *LinuxEvent, ptr: *i32) void {
  68.             const rc = system.futex_wake(ptr, system.FUTEX_WAKE | system.FUTEX_PRIVATE_FLAG, 1);
  69.             assert(os.errno(rc) == 0);
  70.         }
  71.  
  72.         fn wait(self: *LinuxEvent, ptr: *i32) void {
  73.             while (@atomicLoad(i32, ptr, .Acquire) == 1) {
  74.                 const rc = system.futex_wait(ptr, system.FUTEX_WAIT | system.FUTEX_PRIVATE_FLAG, 1, null);
  75.                 switch (os.errno(rc)) {
  76.                     0 => return,
  77.                     os.EAGAIN => return,
  78.                     os.EINTR => continue,
  79.                     else => unreachable,
  80.                 }
  81.             }
  82.         }
  83.     };
  84.  
  85.     const WindowsEvent = struct {
  86.         const windows = std.os.windows;
  87.  
  88.         fn init() WindowsEvent { return WindowsEvent{}; }
  89.         fn deinit(self: *WindowsEvent) void {}
  90.  
  91.         fn wake(self: *WindowsEvent, ptr: *i32) void {
  92.             const handle = getEventHandle() orelse return @ptrCast(*SpinEvent, self).wake(ptr);
  93.             const key = @ptrCast(*const c_void, ptr);
  94.             const rc = system.ntdll.NtReleaseKeyedEvent(handle, key, system.FALSE, null);
  95.             assert(rc == 0);
  96.         }
  97.  
  98.         fn wait(self: *WindowsEvent, ptr: *i32) void {
  99.             const handle = getEventHandle() orelse return @ptrCast(*SpinEvent, self).wait(ptr);
  100.             const key = @ptrCast(*const c_void, ptr);
  101.             const rc = system.ntdll.NtWaitForKeyedEvent(handle, key, system.FALSE, null);
  102.             assert(rc == 0);
  103.         }
  104.  
  105.         var event_handle = std.lazyInit(?system.HANDLE);
  106.  
  107.         fn getEventHandle() ?system.HANDLE {
  108.             if (event_handle.get()) |handle|
  109.                 return handle.*;
  110.             const handle_ptr = @ptrCast(*system.HANDLE, &event_handle.data);
  111.             const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
  112.             if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != 0)
  113.                 event_handle.data = null;
  114.             event_handle.resolve();
  115.             return event_handle.data;
  116.         }
  117.     };
  118.  
  119.     const PosixEvent = struct {
  120.         cond: system.pthread_cond_t,
  121.         mutex: system.pthread_mutex_t,
  122.  
  123.         fn init() PosixEvent {
  124.             return PosixEvent{
  125.                 .cond = system.PTHREAD_COND_INITIALIZER,
  126.                 .mutex = system.PTHREAD_MUTEX_INITIALIZER,
  127.             };
  128.         }
  129.  
  130.         fn deinit(self: *PosixEvent) void {
  131.             const valid_error = if (builtin.os == .dragonfly) os.EINVAL else 0;
  132.             const retm = system.pthread_mutex_destroy(&self.mutex);
  133.             assert(retm == 0 or retm == valid_error);
  134.             const retc = system.pthread_cond_destroy(&self.cond);
  135.             assert(retc == 0 or retc == valid_error);
  136.         }
  137.  
  138.         fn wake(self: *PosixEvent, ptr: *i32) void {
  139.             assert(system.pthread_mutex_lock(&self.mutex));
  140.             defer assert(system.pthread_mutex_unlock(&self.mutex));
  141.  
  142.             system.pthread_cond_signal(&self.cond);
  143.         }
  144.  
  145.         fn wait(self: *PosixEvent, ptr: *i32) void {
  146.             assert(system.pthread_mutex_lock(&self.mutex));
  147.             defer assert(system.pthread_mutex_unlock(&self.mutex));
  148.  
  149.             while (@atomicLoad(i32, ptr, .Acquire) == 1)
  150.                 system.pthread_cond_wait(&self.cond, &self.mutex);
  151.         }
  152.     };
  153. };
  154.  
  155. const Mutex = struct {
  156.     state: usize,
  157.  
  158.     const MUTEX_LOCK: usize = 1;
  159.     const QUEUE_LOCK: usize = 2;
  160.     const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
  161.  
  162.     const SPIN_CPU = 4;
  163.     const SPIN_CPU_COUNT = 30;
  164.     const SPIN_THREAD = 1;
  165.  
  166.     const QueueNode = struct {
  167.         next: ?*QueueNode,
  168.         event: ResetEvent,
  169.     };
  170.  
  171.     pub fn init() Mutex {
  172.         return Mutex{ .state = 0 };
  173.     }
  174.  
  175.     pub fn deinit(self: *Mutex) void {
  176.         self.* = undefined;
  177.     }
  178.  
  179.     pub fn acquire(self: *Mutex) Held {
  180.         if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |state|
  181.             self.acquireSlow(state);
  182.         return Held{ .mutex = self };
  183.     }
  184.  
  185.     fn acquireSlow(self: *Mutex, current_state: usize) void {
  186.         var state = current_state;
  187.         var spin_count: usize = 0;
  188.         while (true) {
  189.  
  190.             if (state & MUTEX_LOCK == 0) {
  191.                 state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
  192.                 continue;
  193.             }
  194.  
  195.             if (state & QUEUE_MASK == 0 and spin_count < SPIN_CPU + SPIN_THREAD) {
  196.                 if (spin_count < SPIN_CPU) {
  197.                     SpinLock.yield(SPIN_CPU_COUNT);
  198.                 } else {
  199.                     os.sched_yield() catch std.time.sleep(1 * std.time.millisecond);
  200.                 }
  201.                 spin_count += 1;
  202.                 state = @atomicLoad(usize, &self.state, .Monotonic);
  203.                 continue;
  204.             }
  205.  
  206.             var node = QueueNode{
  207.                 .event = ResetEvent.init(),
  208.                 .next = @intToPtr(?*QueueNode, state & QUEUE_MASK),
  209.             };
  210.             defer node.event.deinit();
  211.             const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
  212.             state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
  213.                 node.event.wait();
  214.                 spin_count = 0;
  215.                 state = @atomicLoad(usize, &self.state, .Monotonic);
  216.                 continue;
  217.             };
  218.         }
  219.     }
  220.  
  221.     pub const Held = struct {
  222.         mutex: *Mutex,
  223.  
  224.         pub fn release(self: Held) void {
  225.             const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
  226.             if (state & QUEUE_LOCK == 0 and state & QUEUE_MASK != 0)
  227.                 self.mutex.releaseSlow(state);
  228.         }
  229.     };
  230.  
  231.     fn releaseSlow(self: *Mutex, current_state: usize) void {
  232.         var state = current_state;
  233.         while (true) {
  234.             if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0)
  235.                 return;
  236.             state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
  237.         }
  238.  
  239.         while (true) {
  240.             if (state & MUTEX_LOCK != 0) {
  241.                 state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return;
  242.                 continue;
  243.             }
  244.  
  245.             const node = @intToPtr(*QueueNode, state & QUEUE_MASK);
  246.             state = @cmpxchgWeak(usize, &self.state, state, @ptrToInt(node.next), .Release, .Monotonic)
  247.                 orelse return node.event.set();
  248.         }
  249.     }
  250. };
  251.  
  252.  
  253. const TestContext = struct {
  254.     mutex: *Mutex,
  255.     data: i128,
  256.  
  257.     const incr_count = 10000;
  258. };
  259.  
  260. test "std.Mutex" {
  261.     std.debug.warn("\n", .{});
  262.     var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
  263.     defer std.heap.page_allocator.free(plenty_of_memory);
  264.  
  265.     var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
  266.     var a = &fixed_buffer_allocator.allocator;
  267.  
  268.     var mutex = Mutex.init();
  269.     defer mutex.deinit();
  270.  
  271.     var context = TestContext{
  272.         .mutex = &mutex,
  273.         .data = 0,
  274.     };
  275.  
  276.     if (builtin.single_threaded) {
  277.         worker(&context);
  278.         testing.expect(context.data == TestContext.incr_count);
  279.     } else {
  280.         var threads: [thread_count]*std.Thread = undefined;
  281.         for (threads) |*t| {
  282.             t.* = try std.Thread.spawn(&context, worker);
  283.         }
  284.         for (threads) |t|
  285.             t.wait();
  286.  
  287.         testing.expect(context.data == thread_count * TestContext.incr_count);
  288.     }
  289. }
  290.  
  291. const thread_count = 10;
  292. var x: usize = 0;
  293. fn worker(ctx: *TestContext) void {
  294.     var i: usize = 0;
  295.     while (i != TestContext.incr_count) : (i += 1) {
  296.         const held = ctx.mutex.acquire();
  297.         defer held.release();
  298.  
  299.         ctx.data += 1;
  300.     }
  301.     std.debug.warn("{} finished {}/{}\n", .{std.Thread.getCurrentId(), @atomicRmw(usize, &x, .Add, 1, .SeqCst), thread_count-1});
  302. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement