Advertisement
Guest User

Untitled

a guest
Sep 9th, 2019
209
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.55 KB | None | 0 0
  1. diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
  2. index 69ecd20106..7c8054e993 100644
  3. --- a/src/libstd/sync/mpsc/mod.rs
  4. +++ b/src/libstd/sync/mpsc/mod.rs
  5. @@ -1418,6 +1418,10 @@ impl<T> Receiver<T> {
  6.              if let Some(new_port) = port_or_empty {
  7.                  unsafe {
  8.                      mem::swap(self.inner_mut(), new_port.inner_mut());
  9. +                    match self.inner() {
  10. +                        Flavor::Shared(ref p) => { p.post_rx_upgrade(); }
  11. +                        _ => {}
  12. +                    }
  13.                  }
  14.              }
  15.  
  16. diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs
  17. index dbcdcdac93..d3be94df74 100644
  18. --- a/src/libstd/sync/mpsc/shared.rs
  19. +++ b/src/libstd/sync/mpsc/shared.rs
  20. @@ -49,6 +49,7 @@ pub struct Packet<T> {
  21.      // this lock protects various portions of this implementation during
  22.      // select()
  23.      select_lock: Mutex<()>,
  24. +    deferred_downgrade: UnsafeCell<bool>,
  25.  }
  26.  
  27.  pub enum Failure {
  28. @@ -75,6 +76,7 @@ impl<T> Packet<T> {
  29.              port_dropped: AtomicBool::new(false),
  30.              sender_drain: AtomicIsize::new(0),
  31.              select_lock: Mutex::new(()),
  32. +            deferred_downgrade: UnsafeCell::new(false),
  33.          }
  34.      }
  35.  
  36. @@ -216,6 +218,10 @@ impl<T> Packet<T> {
  37.      }
  38.  
  39.      pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
  40. +        if unsafe{ *self.deferred_downgrade.get() } {
  41. +            self.downgrade();
  42. +        }
  43. +
  44.          // This code is essentially the exact same as that found in the stream
  45.          // case (see stream.rs)
  46.          match self.try_recv() {
  47. @@ -387,6 +393,23 @@ impl<T> Packet<T> {
  48.          }
  49.      }
  50.  
  51. +    // Postpone a reversal of some changes done in inherit_borrowers() prior to next
  52. +    // revc() - These did not apply since the recv timed out with an upgrade, and did
  53. +    // not produce data
  54. +    pub fn post_rx_upgrade(&self) {
  55. +        unsafe{ *self.deferred_downgrade.get() = true };
  56. +    }
  57. +
  58. +    fn downgrade(&self) {
  59. +        let _cnt = self.cnt.fetch_add(1,Ordering::SeqCst);
  60. +        unsafe { *self.steals.get() += 1 };  // Bumps since abort_select() was not called
  61. +        let ptr = self.to_wake.swap(0, Ordering::SeqCst);
  62. +        if ptr!=0 {
  63. +            let _token = unsafe{ SignalToken::cast_from_usize(ptr) };
  64. +        }
  65. +        unsafe{ *self.deferred_downgrade.get() = false };
  66. +    }
  67. +
  68.      // Consumes ownership of the 'to_wake' field.
  69.      fn take_to_wake(&self) -> SignalToken {
  70.          let ptr = self.to_wake.load(Ordering::SeqCst);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement