SHARE
TWEET

Untitled

a guest Jul 21st, 2019 71 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // #![allow(unused)]
  2. use crossbeam::channel::{Receiver, Sender};
  3.  
  4. /// Representation of a merged set of channels as an iterator
  5. ///
  6. /// Depends upon the assumption that all data in chans is already sorted.
  7. ///
  8. /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
  9. ///
  10. /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
  11. ///
  12. /// Once we have as many head_items as chans we pop the head and save the index that the item came
  13. /// from. On the next iteration we wait on that channel before repeating the insert and pop.
  14. ///
  15. /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
  16. /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
  17. /// channel.
  18. ///
  19. /// Start yielding only None when chans is empty
  20. pub struct MergedChannels<T> {
  21.     /// Set of channels to merge input from
  22.     chans: Vec<Receiver<T>>,
  23.     /// Sorted list of head items already grabbed from other channels and the index of that channel
  24.     /// in chans
  25.     head_items: Vec<(T, usize)>,
  26.     /// the index of the source chan of the previously yielded head_item
  27.     last_picked: Option<usize>,
  28. }
  29.  
  30. impl<T> Iterator for MergedChannels<T>
  31. where
  32.     T: std::cmp::Ord,
  33. {
  34.     type Item = T;
  35.  
  36.     fn next(&mut self) -> Option<Self::Item> {
  37.         if self.chans.is_empty() {
  38.             return None;
  39.         }
  40.  
  41.         if let Some(id) = self.last_picked {
  42.             self.receive_from(id);
  43.         } else {
  44.             self.receive_from_all();
  45.         }
  46.  
  47.         Some(self.get_next_head_item())
  48.     }
  49. }
  50.  
  51. impl<T> MergedChannels<T>
  52. where
  53.     T: std::cmp::Ord,
  54. {
  55.     pub fn new(chans: Vec<Receiver<T>>) -> Self {
  56.         Self {
  57.             chans,
  58.             head_items: vec![],
  59.             last_picked: None,
  60.         }
  61.     }
  62.  
  63.     fn get_next_head_item(&mut self) -> T {
  64.         assert!(!self.head_items.is_empty());
  65.  
  66.         let (item, last_picked) = self.head_items.pop().unwrap();
  67.  
  68.         self.last_picked = Some(last_picked);
  69.  
  70.         item
  71.     }
  72.  
  73.     fn receive_from(&mut self, id: usize) {
  74.         match self.chans[id].recv() {
  75.             Ok(item) => self.sorted_insert((item, id)),
  76.             Err(e) => {
  77.                 // debug!(message = "channel exhausted", ?id, ?e);
  78.                 self.remove_channel(id);
  79.             }
  80.         }
  81.     }
  82.  
  83.     fn receive_from_all(&mut self) {
  84.         for id in 0..self.chans.len() {
  85.             self.receive_from(id);
  86.         }
  87.     }
  88.  
  89.     fn sorted_insert(&mut self, item: (T, usize)) {
  90.         let ind = match self.head_items.binary_search(&item) {
  91.             Ok(_id) => unreachable!(), // exact match exists
  92.             Err(id) => id,             // insert location to maintain sort order
  93.         };
  94.  
  95.         self.head_items.insert(ind, item);
  96.     }
  97.  
  98.     fn remove_channel(&mut self, id: usize) {
  99.         let _ = self.chans.swap_remove(id);
  100.         let (_, new_id) = self.head_items.swap_remove(id);
  101.         self.head_items[id].1 = new_id;
  102.     }
  103. }
  104.  
  105. #[cfg(test)]
  106. mod tests {
  107.     use super::*;
  108.  
  109.     #[test]
  110.     fn happy_path() {
  111.         let (s1, r1) = crossbeam::channel::unbounded();
  112.         let (s2, r2) = crossbeam::channel::unbounded();
  113.  
  114.         let mut lines = ["hi", "okay", "abc"];
  115.         lines.sort();
  116.  
  117.         for line in lines.iter() {
  118.             s1.send(line.to_string());
  119.         }
  120.  
  121.         let mut lines2 = ["bcd", "hoho", "zyz"];
  122.         lines.sort();
  123.  
  124.         for line in lines2.iter() {
  125.             s2.send(line.to_string());
  126.         }
  127.  
  128.         let mut m = MergedChannels::new(vec![r1, r2]);
  129.  
  130.         for item in m {
  131.             println!("{}", item);
  132.         }
  133.     }
  134. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top