Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
- index 69ecd20106..7c8054e993 100644
- --- a/src/libstd/sync/mpsc/mod.rs
- +++ b/src/libstd/sync/mpsc/mod.rs
- @@ -1418,6 +1418,10 @@ impl<T> Receiver<T> {
- if let Some(new_port) = port_or_empty {
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- + match self.inner() {
- + Flavor::Shared(ref p) => { p.post_rx_upgrade(); }
- + _ => {}
- + }
- }
- }
- diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs
- index dbcdcdac93..d3be94df74 100644
- --- a/src/libstd/sync/mpsc/shared.rs
- +++ b/src/libstd/sync/mpsc/shared.rs
- @@ -49,6 +49,7 @@ pub struct Packet<T> {
- // this lock protects various portions of this implementation during
- // select()
- select_lock: Mutex<()>,
- + deferred_downgrade: UnsafeCell<bool>,
- }
- pub enum Failure {
- @@ -75,6 +76,7 @@ impl<T> Packet<T> {
- port_dropped: AtomicBool::new(false),
- sender_drain: AtomicIsize::new(0),
- select_lock: Mutex::new(()),
- + deferred_downgrade: UnsafeCell::new(false),
- }
- }
- @@ -216,6 +218,10 @@ impl<T> Packet<T> {
- }
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
- + if unsafe{ *self.deferred_downgrade.get() } {
- + self.downgrade();
- + }
- +
- // This code is essentially the exact same as that found in the stream
- // case (see stream.rs)
- match self.try_recv() {
- @@ -387,6 +393,23 @@ impl<T> Packet<T> {
- }
- }
- + // Postpone a reversal of some changes done in inherit_borrowers() prior to next
- + // revc() - These did not apply since the recv timed out with an upgrade, and did
- + // not produce data
- + pub fn post_rx_upgrade(&self) {
- + unsafe{ *self.deferred_downgrade.get() = true };
- + }
- +
- + fn downgrade(&self) {
- + let _cnt = self.cnt.fetch_add(1,Ordering::SeqCst);
- + unsafe { *self.steals.get() += 1 }; // Bumps since abort_select() was not called
- + let ptr = self.to_wake.swap(0, Ordering::SeqCst);
- + if ptr!=0 {
- + let _token = unsafe{ SignalToken::cast_from_usize(ptr) };
- + }
- + unsafe{ *self.deferred_downgrade.get() = false };
- + }
- +
- // Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&self) -> SignalToken {
- let ptr = self.to_wake.load(Ordering::SeqCst);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement