Advertisement
Guest User

Untitled

a guest
Oct 9th, 2019
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.00 KB | None | 0 0
  1. //use futures_core::future::{FusedFuture, Future};
  2. use futures::future::{FusedFuture, Future};
  3.  
  4. //use futures_core::task::{Context, Poll, Waker};
  5. use futures::task::{Context, Poll, Waker};
  6.  
  7. use slab::Slab;
  8. use std::cell::UnsafeCell;
  9. use std::marker::PhantomData;
  10. use std::ops::{Deref, DerefMut};
  11. use std::pin::Pin;
  12. use std::sync::atomic::{AtomicUsize, Ordering};
  13. use std::sync::Mutex as StdMutex;
  14. use std::{fmt, mem};
  15.  
  16. /// A futures-aware mutex.
  17. pub struct Mutex<T: ?Sized> {
  18. state: AtomicUsize,
  19. waiters: StdMutex<Slab<Waiter>>,
  20. read_waiters: StdMutex<Slab<Waiter>>,
  21. value: UnsafeCell<T>,
  22. }
  23.  
  24. impl<T: ?Sized> fmt::Debug for Mutex<T> {
  25. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  26. let s = State(self.state.load(Ordering::SeqCst));
  27. f.debug_struct("Mutex")
  28. .field("is_locked", &s.is_locked())
  29. .field("has_waiters", &s.has_waiters())
  30. .field("has_read_waiters", &s.has_read_waiters())
  31. .field("is_corrupted", &s.is_corrupted())
  32. .field("read_lock_count", &s.read_lock_count())
  33. .finish()
  34. }
  35. }
  36.  
  37. impl<T> From<T> for Mutex<T> {
  38. fn from(t: T) -> Self {
  39. Self::new(t)
  40. }
  41. }
  42.  
  43. impl<T: Default> Default for Mutex<T> {
  44. fn default() -> Mutex<T> {
  45. Mutex::new(Default::default())
  46. }
  47. }
  48.  
  49. pub enum Waiter {
  50. Waiting(Waker),
  51. Woken,
  52. }
  53.  
  54. impl Waiter {
  55. fn register(&mut self, waker: &Waker) {
  56. match self {
  57. Waiter::Waiting(w) if waker.will_wake(w) => {}
  58. _ => *self = Waiter::Waiting(waker.clone()),
  59. }
  60. }
  61.  
  62. fn wake(&mut self) {
  63. match mem::replace(self, Waiter::Woken) {
  64. Waiter::Waiting(waker) => waker.wake(),
  65. Waiter::Woken => {}
  66. }
  67. }
  68. }
  69.  
  70. macro_rules! bitflag {
  71. ($name:ident, $bit:expr) => {
  72. const fn $name(&self) -> bool {
  73. self.0 & $bit != 0
  74. }
  75. };
  76. ($name:ident, $flag:ident, $bit:expr) => {
  77. const $flag: usize = $bit;
  78. bitflag!($name, Self::$flag);
  79. }
  80. }
  81. struct State(usize);
  82. impl State {
  83. // set if it's write locked
  84. bitflag!(is_locked, IS_LOCKED, 1 << 31);
  85. // set if there are write waiters
  86. bitflag!(has_waiters, HAS_WAITERS, 1 << 30);
  87. // set if there are read waiters
  88. bitflag!(has_read_waiters, HAS_READ_WAITERS, 1 << 29);
  89.  
  90. // the bits which are valid for read lock count (supports 16777216 readers)
  91. const MAX_READ_LOCKS: usize = (1 << 24) - 1;
  92.  
  93. // bits which should always be zero (for future use)
  94. bitflag!(is_corrupted, (1 << 29) - 1 - Self::MAX_READ_LOCKS);
  95.  
  96. const fn read_lock_count(&self) -> usize {
  97. self.0 & Self::MAX_READ_LOCKS
  98. }
  99. }
  100.  
  101. impl<T> Mutex<T> {
  102. /// Creates a new futures-aware mutex.
  103. pub fn new(t: T) -> Mutex<T> {
  104. Mutex {
  105. state: AtomicUsize::new(0),
  106. waiters: StdMutex::new(Slab::new()),
  107. read_waiters: StdMutex::new(Slab::new()),
  108. value: UnsafeCell::new(t),
  109. }
  110. }
  111.  
  112. /// Consumes this mutex, returning the underlying data.
  113. ///
  114. /// # Examples
  115. ///
  116. /// ```
  117. /// use futures::lock::Mutex;
  118. ///
  119. /// let mutex = Mutex::new(0);
  120. /// assert_eq!(mutex.into_inner(), 0);
  121. /// ```
  122. pub fn into_inner(self) -> T {
  123. self.value.into_inner()
  124. }
  125. }
  126.  
  127. pub struct Lock<'a, RW: ReadWrite, T: ?Sized> {
  128. pub mutex: &'a Mutex<T>,
  129. _rw: PhantomData<RW>,
  130. }
  131.  
  132. impl<'a, RW: ReadWrite, T: ?Sized> Lock<'a, RW, T> {
  133. /// Attempt to acquire the lock immediately.
  134. ///
  135. /// If the lock is currently held, this will return `None`.
  136. pub fn try_lock(&self) -> Option<MutexGuard<'a, RW, T>> {
  137. if RW::IS_WRITE {
  138. let old_state = State(
  139. self.mutex
  140. .state
  141. .fetch_or(State::IS_LOCKED, Ordering::Acquire),
  142. );
  143. if old_state.is_locked() {
  144. return None;
  145. }
  146. } else {
  147. let old_state = State(self.mutex.state.fetch_add(1, Ordering::Acquire));
  148. if old_state.read_lock_count() == State::MAX_READ_LOCKS {
  149. panic!("Overflowed the number of read locks");
  150. }
  151. if old_state.is_locked() {
  152. self.mutex.state.fetch_sub(1, Ordering::Relaxed);
  153. return None;
  154. }
  155. }
  156. Some(MutexGuard {
  157. mutex: self.mutex,
  158. _rw: PhantomData,
  159. })
  160. }
  161.  
  162. /// Acquire the lock asynchronously.
  163. ///
  164. /// This method returns a future that will resolve once the lock has been
  165. /// successfully acquired.
  166. pub fn lock(&self) -> MutexLockFuture<'a, RW, T> {
  167. MutexLockFuture {
  168. mutex: Some(self.mutex),
  169. wait_key: WAIT_KEY_NONE,
  170. _rw: PhantomData,
  171. }
  172. }
  173. }
  174.  
  175. impl<'a, T: ?Sized> Mutex<T> {
  176. pub const fn read(&'a self) -> Lock<'a, Read, T> {
  177. Lock {
  178. mutex: self,
  179. _rw: PhantomData,
  180. }
  181. }
  182. pub const fn write(&'a self) -> Lock<'a, Write, T> {
  183. Lock {
  184. mutex: self,
  185. _rw: PhantomData,
  186. }
  187. }
  188. /// Returns a mutable reference to the underlying data.
  189. ///
  190. /// Since this call borrows the `Mutex` mutably, no actual locking needs to
  191. /// take place -- the mutable borrow statically guarantees no locks exist.
  192. ///
  193. /// # Examples
  194. ///
  195. /// ```
  196. /// # futures::executor::block_on(async {
  197. /// use futures::lock::Mutex;
  198. ///
  199. /// let mut mutex = Mutex::new(0);
  200. /// *mutex.get_mut() = 10;
  201. /// assert_eq!(*mutex.lock().await, 10);
  202. /// # });
  203. /// ```
  204. pub fn get_mut(&mut self) -> &mut T {
  205. // We know statically that there are no other references to `self`, so
  206. // there's no need to lock the inner mutex.
  207. unsafe { &mut *self.value.get() }
  208. }
  209.  
  210. fn remove_waker<RW:ReadWrite>(&self, wait_key: usize, wake_another: bool) {
  211. if wait_key == WAIT_KEY_NONE {
  212. return;
  213. }
  214. let mut waiters = RW::waiters(self).lock().unwrap();
  215. match waiters.remove(wait_key) {
  216. Waiter::Waiting(_) => {}
  217. Waiter::Woken => {
  218. // We were awoken, but then dropped before we could
  219. // wake up to acquire the lock. Wake up another
  220. // waiter.
  221. if RW::IS_WRITE && wake_another {
  222. if let Some((_i, waiter)) = waiters.iter_mut().next() {
  223. waiter.wake();
  224. }
  225. }
  226. }
  227. }
  228. if waiters.is_empty() {
  229. self.state.fetch_and(!State::HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
  230. // If we're in read mode, wake_another is true, AND we're the last one
  231. // Lets wake a write lock
  232. if !RW::IS_WRITE && wake_another {
  233. let mut write_waiters = self.waiters.lock().unwrap();
  234. if let Some((_i, waiter)) = write_waiters.iter_mut().next() {
  235. waiter.wake();
  236. }
  237. }
  238. }
  239. }
  240. }
  241.  
  242. pub trait ReadWrite: Sized + Unpin {
  243. const IS_WRITE: bool;
  244. const STR: &'static str;
  245. const HAS_WAITERS_FLAG: usize;
  246. fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T>;
  247. fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>>;
  248. }
  249. pub struct Write;
  250. pub struct Read;
  251. impl ReadWrite for Write {
  252. const IS_WRITE: bool = true;
  253. const STR: &'static str = "Write";
  254. const HAS_WAITERS_FLAG: usize = State::HAS_WAITERS;
  255. fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T> {
  256. m.write()
  257. }
  258. fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>> {
  259. &m.waiters
  260. }
  261. }
  262. impl ReadWrite for Read {
  263. const IS_WRITE: bool = true;
  264. const STR: &'static str = "Read";
  265. const HAS_WAITERS_FLAG: usize = State::HAS_READ_WAITERS;
  266. fn lock<'a, T: ?Sized>(m: &'a Mutex<T>) -> Lock<'a, Self, T> {
  267. m.read()
  268. }
  269. fn waiters<'a, T: ?Sized>(m: &Mutex<T>) -> &StdMutex<Slab<Waiter>> {
  270. &m.read_waiters
  271. }
  272. }
  273.  
  274. // Sentinel for when no slot in the `Slab` has been dedicated to this object.
  275. const WAIT_KEY_NONE: usize = usize::max_value();
  276.  
  277. /// A future which resolves when the target mutex has been successfully acquired.
  278. pub struct MutexLockFuture<'a, RW: ReadWrite, T: ?Sized> {
  279. // `None` indicates that the mutex was successfully acquired.
  280. mutex: Option<&'a Mutex<T>>,
  281. wait_key: usize,
  282. _rw: PhantomData<RW>,
  283. }
  284.  
  285. impl<RW: ReadWrite, T: ?Sized> fmt::Debug for MutexLockFuture<'_, RW, T> {
  286. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  287. f.debug_struct(&format!("MutexLockFuture<{}>", RW::STR)[..])
  288. .field("was_acquired", &self.mutex.is_none())
  289. .field("mutex", &self.mutex)
  290. .field(
  291. "wait_key",
  292. &(if self.wait_key == WAIT_KEY_NONE {
  293. None
  294. } else {
  295. Some(self.wait_key)
  296. }),
  297. )
  298. .finish()
  299. }
  300. }
  301.  
  302. impl<'a, RW: ReadWrite, T: ?Sized> FusedFuture for MutexLockFuture<'a, RW, T> {
  303. fn is_terminated(&self) -> bool {
  304. self.mutex.is_none()
  305. }
  306. }
  307. impl<'a, RW: ReadWrite, T: ?Sized> Future for MutexLockFuture<'a, RW, T> {
  308. type Output = MutexGuard<'a, RW, T>;
  309. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  310. let mutex = self.mutex.expect("polled MutexLockFuture after completion");
  311.  
  312. let lock = RW::lock(mutex);
  313. if let Some(locked) = lock.try_lock() {
  314. mutex.remove_waker::<RW>(self.wait_key, false);
  315. self.mutex = None;
  316. return Poll::Ready(locked);
  317. }
  318.  
  319. {
  320. let mut waiters = RW::waiters(mutex).lock().unwrap();
  321. if self.wait_key == WAIT_KEY_NONE {
  322. self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
  323. if waiters.len() == 1 {
  324. mutex
  325. .state
  326. .fetch_or(RW::HAS_WAITERS_FLAG, Ordering::Relaxed); // released by mutex unlock
  327. }
  328. } else {
  329. waiters[self.wait_key].register(cx.waker());
  330. }
  331. }
  332.  
  333. // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
  334. // attempting to acquire the lock again.
  335. if let Some(locked) = lock.try_lock() {
  336. mutex.remove_waker::<RW>(self.wait_key, false);
  337. self.mutex = None;
  338. return Poll::Ready(locked);
  339. }
  340.  
  341. Poll::Pending
  342. }
  343. }
  344.  
  345. impl<RW: ReadWrite, T: ?Sized> Drop for MutexLockFuture<'_, RW, T> {
  346. fn drop(&mut self) {
  347. if let Some(mutex) = self.mutex {
  348. // This future was dropped before it acquired the mutex.
  349. //
  350. // Remove ourselves from the map, waking up another waiter if we
  351. // had been awoken to acquire the lock.
  352. mutex.remove_waker::<RW>(self.wait_key, true);
  353. }
  354. }
  355. }
  356.  
  357. /// An RAII guard returned by the `lock` and `try_lock` methods.
  358. /// When this structure is dropped (falls out of scope), the lock will be
  359. /// unlocked.
  360. pub struct MutexGuard<'a, RW: ReadWrite, T: ?Sized> {
  361. mutex: &'a Mutex<T>,
  362. _rw: PhantomData<RW>,
  363. }
  364.  
  365. impl<RW: ReadWrite, T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, RW, T> {
  366. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  367. f.debug_struct(&format!("MutexGuard<{}>", RW::STR)[..])
  368. .field("value", &*self)
  369. .field("mutex", &self.mutex)
  370. .finish()
  371. }
  372. }
  373.  
  374. impl<RW: ReadWrite, T: ?Sized> Drop for MutexGuard<'_, RW, T> {
  375. fn drop(&mut self) {
  376. if RW::IS_WRITE {
  377. let old_state = State(
  378. self.mutex
  379. .state
  380. .fetch_and(!State::IS_LOCKED, Ordering::AcqRel),
  381. );
  382. if old_state.has_waiters() {
  383. // Wake one write waiter
  384. let mut waiters = self.mutex.waiters.lock().unwrap();
  385. if let Some((_i, waiter)) = waiters.iter_mut().next() {
  386. waiter.wake();
  387. }
  388. } else if old_state.has_read_waiters() {
  389. // Wake all read waiters
  390. let mut waiters = self.mutex.waiters.lock().unwrap();
  391. for w in waiters.iter_mut() {
  392. w.1.wake()
  393. }
  394. }
  395. } else {
  396. let old_state = State(self.mutex.state.fetch_sub(1, Ordering::AcqRel));
  397. if old_state.read_lock_count() > 1 {
  398. // We're not the last read lock holder
  399. } else if old_state.has_waiters() {
  400. // Wake one write waiter
  401. let mut waiters = self.mutex.waiters.lock().unwrap();
  402. if let Some((_i, waiter)) = waiters.iter_mut().next() {
  403. waiter.wake();
  404. }
  405. } else if old_state.has_read_waiters() {
  406. panic!("Race condition, read waiters were queued while the read lock was held")
  407. }
  408. }
  409. }
  410. }
  411.  
  412. impl<RW: ReadWrite, T: ?Sized> Deref for MutexGuard<'_, RW, T> {
  413. type Target = T;
  414. fn deref(&self) -> &T {
  415. unsafe { &*self.mutex.value.get() }
  416. }
  417. }
  418. impl<T: ?Sized> DerefMut for MutexGuard<'_, Write, T> {
  419. fn deref_mut(&mut self) -> &mut T {
  420. unsafe { &mut *self.mutex.value.get() }
  421. }
  422. }
  423.  
  424. // Mutexes can be moved freely between threads and acquired on any thread so long
  425. // as the inner value can be safely sent between threads.
  426. unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
  427. unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
  428.  
  429. // It's safe to switch which thread the acquire is being attempted on so long as
  430. // `T` can be accessed on that thread.
  431. unsafe impl<RW: ReadWrite, T: ?Sized + Send> Send for MutexLockFuture<'_, RW, T> {}
  432. // doesn't have any interesting `&self` methods (only Debug)
  433. unsafe impl<RW: ReadWrite, T: ?Sized> Sync for MutexLockFuture<'_, RW, T> {}
  434.  
  435. // Safe to send since we don't track any thread-specific details-- the inner
  436. // lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
  437. unsafe impl<RW: ReadWrite, T: ?Sized + Send> Send for MutexGuard<'_, RW, T> {}
  438. unsafe impl<RW: ReadWrite, T: ?Sized + Sync> Sync for MutexGuard<'_, RW, T> {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement