SHARE
TWEET

Untitled

a guest Oct 9th, 2019 89 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. //use futures_core::future::{FusedFuture, Future};
  2. use futures::future::{FusedFuture, Future};
  3.  
  4. //use futures_core::task::{Context, Poll, Waker};
  5. use futures::task::{Context, Poll, Waker};
  6.  
  7. use slab::Slab;
  8. use std::cell::UnsafeCell;
  9. use std::marker::PhantomData;
  10. use std::ops::{Deref, DerefMut};
  11. use std::pin::Pin;
  12. use std::sync::atomic::{AtomicUsize, Ordering};
  13. use std::sync::Mutex as StdMutex;
  14. use std::{fmt, mem};
  15.  
  16. /// A futures-aware mutex.
  17. pub struct Mutex<T: ?Sized> {
  18.     state: AtomicUsize,
  19.     waiters: StdMutex<Slab<Waiter>>,
  20.     read_waiters: StdMutex<Slab<Waiter>>,
  21.     value: UnsafeCell<T>,
  22. }
  23.  
  24. impl<T: ?Sized> fmt::Debug for Mutex<T> {
  25.     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  26.         let s = State(self.state.load(Ordering::SeqCst));
  27.         f.debug_struct("Mutex")
  28.             .field("is_locked", &s.is_locked())
  29.             .field("has_waiters", &s.has_waiters())
  30.             .field("has_read_waiters", &s.has_read_waiters())
  31.             .field("is_corrupted", &s.is_corrupted())
  32.             .field("read_lock_count", &s.read_lock_count())
  33.             .finish()
  34.     }
  35. }
  36.  
  37. impl<T> From<T> for Mutex<T> {
  38.     fn from(t: T) -> Self {
  39.         Self::new(t)
  40.     }
  41. }
  42.  
  43. impl<T: Default> Default for Mutex<T> {
  44.     fn default() -> Mutex<T> {
  45.         Mutex::new(Default::default())
  46.     }
  47. }
  48.  
  49. pub enum Waiter {
  50.     Waiting(Waker),
  51.     Woken,
  52. }
  53.  
  54. impl Waiter {
  55.     fn register(&mut self, waker: &Waker) {
  56.         match self {
  57.             Waiter::Waiting(w) if waker.will_wake(w) => {}
  58.             _ => *self = Waiter::Waiting(waker.clone()),
  59.         }
  60.     }
  61.  
  62.     fn wake(&mut self) {
  63.         match mem::replace(self, Waiter::Woken) {
  64.             Waiter::Waiting(waker) => waker.wake(),
  65.             Waiter::Woken => {}
  66.         }
  67.     }
  68. }
  69.  
  70. macro_rules! bitflag {
  71.     ($name:ident, $bit:expr) => {
  72.         const fn $name(&self) -> bool {
  73.             self.0 & $bit != 0
  74.         }
  75.     };
  76.     ($name:ident, $flag:ident, $bit:expr) => {
  77.         const $flag: usize = $bit;
  78.         bitflag!($name, Self::$flag);
  79.     }
  80. }
  81. struct State(usize);
  82. impl State {
  83.     // set if it's write locked
  84.     bitflag!(is_locked, IS_LOCKED, 1 << 31);
  85.     // set if there are write waiters
  86.     bitflag!(has_waiters, HAS_WAITERS, 1 << 30);
  87.     // set if there are read waiters
  88.     bitflag!(has_read_waiters, HAS_READ_WAITERS, 1 << 29);
  89.  
  90.     // the bits which are valid for read lock count (supports 16777216 readers)
  91.     const MAX_READ_LOCKS: usize = (1 << 24) - 1;
  92.  
  93.     // bits which should always be zero (for future use)
  94.     bitflag!(is_corrupted, (1 << 29) - 1 - Self::MAX_READ_LOCKS);
  95.  
  96.     const fn read_lock_count(&self) -> usize {
  97.         self.0 & Self::MAX_READ_LOCKS
  98.     }
  99. }
  100.  
  101. impl<T> Mutex<T> {
  102.     /// Creates a new futures-aware mutex.
  103.     pub fn new(t: T) -> Mutex<T> {
  104.         Mutex {
  105.             state: AtomicUsize::new(0),
  106.             waiters: StdMutex::new(Slab::new()),
  107.             read_waiters: StdMutex::new(Slab::new()),
  108.             value: UnsafeCell::new(t),
  109.         }
  110.     }
  111.  
  112.     /// Consumes this mutex, returning the underlying data.
  113.     ///
  114.     /// # Examples
  115.     ///
  116.     /// ```
  117.     /// use futures::lock::Mutex;
  118.     ///
  119.     /// let mutex = Mutex::new(0);
  120.     /// assert_eq!(mutex.into_inner(), 0);
  121.     /// ```
  122.     pub fn into_inner(self) -> T {
  123.         self.value.into_inner()
  124.     }
  125. }
  126.  
  127. pub struct Lock<'a, RW: ReadWrite, T: ?Sized> {
  128.     pub mutex: &'a Mutex<T>,
  129.     _rw: PhantomData<RW>,
  130. }
  131.  
  132. impl<'a, RW: ReadWrite, T: ?Sized> Lock<'a, RW, T> {
  133.     /// Attempt to acquire the lock immediately.
  134.     ///
  135.     /// If the lock is currently held, this will return `None`.
  136.     pub fn try_lock(&self) -> Option<MutexGuard<'a, RW, T>> {
  137.         if RW::IS_WRITE {
  138.             let old_state = State(
  139.                 self.mutex
  140.                     .state
  141.                     .fetch_or(State::IS_LOCKED, Ordering::Acquire),
  142.             );
  143.             if old_state.is_locked() {
  144.                 return None;
  145.             }
  146.         } else {
  147.             let old_state = State(self.mutex.state.fetch_add(1, Ordering::Acquire));
  148.             if old_state.read_lock_count() == State::MAX_READ_LOCKS {
  149.                 panic!("Overflowed the number of read locks");
  150.             }
  151.             if old_state.is_locked() {
  152.                 self.mutex.state.fetch_sub(1, Ordering::Relaxed);
  153.                 return None;
  154.             }
  155.         }
  156.         Some(MutexGuard {
  157.             mutex: self.mutex,
  158.             _rw: PhantomData,
  159.         })
  160.     }
  161.  
  162.     /// Acquire the lock asynchronously.
  163.     ///
  164.     /// This method returns a future that will resolve once the lock has been
  165.     /// successfully acquired.
  166.     pub fn lock(&self) -> MutexLockFuture<'a, RW, T> {
  167.         MutexLockFuture {
  168.             mutex: Some(self.mutex),
  169.             wait_key: WAIT_KEY_NONE,
  170.             _rw: PhantomData,
  171.         }
  172.     }
  173. }
  174.  
  175. impl<'a, T: ?Sized> Mutex<T> {
  176.     pub const fn read(&'a self) -> Lock<'a, Read, T> {
  177.         Lock {
  178.             mutex: self,
  179.             _rw: PhantomData,
  180.         }
  181.     }
  182.     pub const fn write(&'a self) -> Lock<'a, Write, T> {
  183.         Lock {
  184.             mutex: self,
  185.             _rw: PhantomData,
  186.         }
  187.     }
  188.     /// Returns a mutable reference to the underlying data.
  189.     ///
  190.     /// Since this call borrows the `Mutex` mutably, no actual locking needs to
  191.     /// take place -- the mutable borrow statically guarantees no locks exist.
  192.     ///
  193.     /// # Examples
  194.     ///
  195.     /// ```
  196.     /// # futures::executor::block_on(async {
  197.     /// use futures::lock::Mutex;
  198.     ///
  199.     /// let mut mutex = Mutex::new(0);
  200.     /// *mutex.get_mut() = 10;
  201.     /// assert_eq!(*mutex.lock().await, 10);
  202.     /// # });
  203.     /// ```
  204.     pub fn get_mut(&mut self) -> &mut T {
  205.         // We know statically that there are no other references to `self`, so
  206.         // there's no need to lock the inner mutex.
  207.         unsafe { &mut *self.value.get() }
  208.     }
  209.  
  210.     fn remove_waker<RW:ReadWrite>(&self, wait_key: usize, wake_another: bool) {
  211.         if wait_key == WAIT_KEY_NONE {
  212.             return;
  213.         }
  214.         let mut waiters = RW::waiters(self).lock().unwrap();
  215.         match waiters.remove(wait_key) {
  216.             Waiter::Waiting(_) => {}
  217.             Waiter::Woken => {
  218.                 // We were awoken, but then dropped before we could
  219.                 // wake up to acquire the lock. Wake up another
  220.                 // waiter.
  221.                 if RW::IS_WRITE && wake_another {
  222.                     if let Some((_i, waiter)) = waiters.iter_mut().next() {
  223.                         waiter.wake();
  224.                     }
  225.                 }
  226.             }
  227.         }
  228.         if waiters.is_empty() {
  229.             self.state.fetch_and(!State::HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
  230.             // If we're in read mode, wake_another is true, AND we're the last one
  231.             // Lets wake a write lock
  232.             if !RW::IS_WRITE && wake_another {
  233.                 let mut write_waiters = self.waiters.lock().unwrap();
  234.                 if let Some((_i, waiter)) = write_waiters.iter_mut().next() {
  235.                     waiter.wake();
  236.                 }
  237.             }
  238.         }
  239.     }
  240. }
  241.  
  242. pub trait ReadWrite: Sized + Unpin {
  243.     const IS_WRITE: bool;
  244.     const STR: &'static str;
  245.     const HAS_WAITERS_FLAG: usize;
  246.     fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T>;
  247.     fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>>;
  248. }
  249. pub struct Write;
  250. pub struct Read;
  251. impl ReadWrite for Write {
  252.     const IS_WRITE: bool = true;
  253.     const STR: &'static str = "Write";
  254.     const HAS_WAITERS_FLAG: usize = State::HAS_WAITERS;
  255.     fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T> {
  256.         m.write()
  257.     }
  258.     fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>> {
  259.         &m.waiters
  260.     }
  261. }
  262. impl ReadWrite for Read {
  263.     const IS_WRITE: bool = true;
  264.     const STR: &'static str = "Read";
  265.     const HAS_WAITERS_FLAG: usize = State::HAS_READ_WAITERS;
  266.     fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T> {
  267.         m.read()
  268.     }
  269.     fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>> {
  270.         &m.read_waiters
  271.     }
  272. }
  273.  
  274. // Sentinel for when no slot in the `Slab` has been dedicated to this object.
  275. const WAIT_KEY_NONE: usize = usize::max_value();
  276.  
  277. /// A future which resolves when the target mutex has been successfully acquired.
  278. pub struct MutexLockFuture<'a, RW: ReadWrite, T: ?Sized> {
  279.     // `None` indicates that the mutex was successfully acquired.
  280.     mutex: Option<&'a Mutex<T>>,
  281.     wait_key: usize,
  282.     _rw: PhantomData<RW>,
  283. }
  284.  
  285. impl<RW: ReadWrite, T: ?Sized> fmt::Debug for MutexLockFuture<'_, RW, T> {
  286.     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  287.         f.debug_struct(&format!("MutexLockFuture<{}>", RW::STR)[..])
  288.             .field("was_acquired", &self.mutex.is_none())
  289.             .field("mutex", &self.mutex)
  290.             .field(
  291.                 "wait_key",
  292.                 &(if self.wait_key == WAIT_KEY_NONE {
  293.                     None
  294.                 } else {
  295.                     Some(self.wait_key)
  296.                 }),
  297.             )
  298.             .finish()
  299.     }
  300. }
  301.  
  302. impl<'a, RW: ReadWrite, T: ?Sized> FusedFuture for MutexLockFuture<'a, RW, T> {
  303.     fn is_terminated(&self) -> bool {
  304.         self.mutex.is_none()
  305.     }
  306. }
  307. impl<'a, RW: ReadWrite, T: ?Sized> Future for MutexLockFuture<'a, RW, T> {
  308.     type Output = MutexGuard<'a, RW, T>;
  309.     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  310.         let mutex = self.mutex.expect("polled MutexLockFuture after completion");
  311.  
  312.         let lock = RW::lock(mutex);
  313.         if let Some(locked) = lock.try_lock() {
  314.             mutex.remove_waker::<RW>(self.wait_key, false);
  315.             self.mutex = None;
  316.             return Poll::Ready(locked);
  317.         }
  318.  
  319.         {
  320.             let mut waiters = RW::waiters(mutex).lock().unwrap();
  321.             if self.wait_key == WAIT_KEY_NONE {
  322.                 self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
  323.                 if waiters.len() == 1 {
  324.                     mutex
  325.                         .state
  326.                         .fetch_or(RW::HAS_WAITERS_FLAG, Ordering::Relaxed); // released by mutex unlock
  327.                 }
  328.             } else {
  329.                 waiters[self.wait_key].register(cx.waker());
  330.             }
  331.         }
  332.  
  333.         // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
  334.         // attempting to acquire the lock again.
  335.         if let Some(locked) = lock.try_lock() {
  336.             mutex.remove_waker::<RW>(self.wait_key, false);
  337.             self.mutex = None;
  338.             return Poll::Ready(locked);
  339.         }
  340.  
  341.         Poll::Pending
  342.     }
  343. }
  344.  
  345. impl<RW: ReadWrite, T: ?Sized> Drop for MutexLockFuture<'_, RW, T> {
  346.     fn drop(&mut self) {
  347.         if let Some(mutex) = self.mutex {
  348.             // This future was dropped before it acquired the mutex.
  349.             //
  350.             // Remove ourselves from the map, waking up another waiter if we
  351.             // had been awoken to acquire the lock.
  352.             mutex.remove_waker::<RW>(self.wait_key, true);
  353.         }
  354.     }
  355. }
  356.  
  357. /// An RAII guard returned by the `lock` and `try_lock` methods.
  358. /// When this structure is dropped (falls out of scope), the lock will be
  359. /// unlocked.
  360. pub struct MutexGuard<'a, RW: ReadWrite, T: ?Sized> {
  361.     mutex: &'a Mutex<T>,
  362.     _rw: PhantomData<RW>,
  363. }
  364.  
  365. impl<RW: ReadWrite, T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, RW, T> {
  366.     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  367.         f.debug_struct(&format!("MutexGuard<{}>", RW::STR)[..])
  368.             .field("value", &*self)
  369.             .field("mutex", &self.mutex)
  370.             .finish()
  371.     }
  372. }
  373.  
  374. impl<RW: ReadWrite, T: ?Sized> Drop for MutexGuard<'_, RW, T> {
  375.     fn drop(&mut self) {
  376.         if RW::IS_WRITE {
  377.             let old_state = State(
  378.                 self.mutex
  379.                     .state
  380.                     .fetch_and(!State::IS_LOCKED, Ordering::AcqRel),
  381.             );
  382.             if old_state.has_waiters() {
  383.                 // Wake one write waiter
  384.                 let mut waiters = self.mutex.waiters.lock().unwrap();
  385.                 if let Some((_i, waiter)) = waiters.iter_mut().next() {
  386.                     waiter.wake();
  387.                 }
  388.             } else if old_state.has_read_waiters() {
  389.                 // Wake all read waiters
  390.                 let mut waiters = self.mutex.waiters.lock().unwrap();
  391.                 for w in waiters.iter_mut() {
  392.                     w.1.wake()
  393.                 }
  394.             }
  395.         } else {
  396.             let old_state = State(self.mutex.state.fetch_sub(1, Ordering::AcqRel));
  397.             if old_state.read_lock_count() > 1 {
  398.                 // We're not the last read lock holder
  399.             } else if old_state.has_waiters() {
  400.                 // Wake one write waiter
  401.                 let mut waiters = self.mutex.waiters.lock().unwrap();
  402.                 if let Some((_i, waiter)) = waiters.iter_mut().next() {
  403.                     waiter.wake();
  404.                 }
  405.             } else if old_state.has_read_waiters() {
  406.                 panic!("Race condition, read waiters were queued while the read lock was held")
  407.             }
  408.         }
  409.     }
  410. }
  411.  
  412. impl<RW: ReadWrite, T: ?Sized> Deref for MutexGuard<'_, RW, T> {
  413.     type Target = T;
  414.     fn deref(&self) -> &T {
  415.         unsafe { &*self.mutex.value.get() }
  416.     }
  417. }
  418. impl<T: ?Sized> DerefMut for MutexGuard<'_, Write, T> {
  419.     fn deref_mut(&mut self) -> &mut T {
  420.         unsafe { &mut *self.mutex.value.get() }
  421.     }
  422. }
  423.  
  424. // Mutexes can be moved freely between threads and acquired on any thread so long
  425. // as the inner value can be safely sent between threads.
  426. unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
  427. unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
  428.  
  429. // It's safe to switch which thread the acquire is being attempted on so long as
  430. // `T` can be accessed on that thread.
  431. unsafe impl<RW: ReadWrite, T: ?Sized + Send> Send for MutexLockFuture<'_, RW, T> {}
  432. // doesn't have any interesting `&self` methods (only Debug)
  433. unsafe impl<RW: ReadWrite, T: ?Sized> Sync for MutexLockFuture<'_, RW, T> {}
  434.  
  435. // Safe to send since we don't track any thread-specific details-- the inner
  436. // lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
  437. unsafe impl<RW: ReadWrite, T: ?Sized + Send> Send for MutexGuard<'_, RW, T> {}
  438. unsafe impl<RW: ReadWrite, T: ?Sized + Sync> Sync for MutexGuard<'_, RW, T> {}
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top