SHARE
TWEET

Untitled

a guest Jul 21st, 2019 72 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // #![allow(unused)]
  2.  
  3. #![deny(unused_qualifications)]
  4. use crossbeam::channel::{Receiver, Sender};
  5.  
  6. /// Representation of a merged set of channels as an iterator
  7. ///
  8. /// Depends upon the assumption that all data in chans is already sorted.
  9. ///
  10. /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
  11. ///
  12. /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
  13. ///
  14. /// Once we have as many head_items as chans we pop the head and save the index that the item came
  15. /// from. On the next iteration we wait on that channel before repeating the insert and pop.
  16. ///
  17. /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
  18. /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
  19. /// channel.
  20. ///
  21. /// Start yielding only None when chans is empty
  22. pub struct MergedChannels<T> {
  23.     /// Set of channels to merge input from
  24.     chans: Vec<Receiver<T>>,
  25.     /// Sorted list of head items already grabbed from other channels and the index of that channel
  26.     /// in chans
  27.     head_items: Vec<(T, usize)>,
  28.     /// the index of the source chan of the previously yielded head_item
  29.     last_picked: Option<usize>,
  30. }
  31.  
  32. impl<T> Iterator for MergedChannels<T>
  33. where
  34.     T: Ord,
  35. {
  36.     type Item = T;
  37.  
  38.     fn next(&mut self) -> Option<Self::Item> {
  39.         if self.chans.is_empty() {
  40.             return None;
  41.         }
  42.  
  43.         if let Some(id) = self.last_picked {
  44.             self.receive_from(id);
  45.         } else {
  46.             self.receive_from_all();
  47.         }
  48.  
  49.         Some(self.get_next_head_item())
  50.     }
  51. }
  52.  
  53. impl<T> MergedChannels<T>
  54. where
  55.     T: Ord,
  56. {
  57.     pub fn new(chans: Vec<Receiver<T>>) -> Self {
  58.         Self {
  59.             chans,
  60.             head_items: vec![],
  61.             last_picked: None,
  62.         }
  63.     }
  64.  
  65.     fn get_next_head_item(&mut self) -> T {
  66.         assert!(!self.head_items.is_empty());
  67.  
  68.         let (item, last_picked) = self.head_items.pop().unwrap();
  69.  
  70.         self.last_picked = Some(last_picked);
  71.  
  72.         item
  73.     }
  74.  
  75.     fn receive_from(&mut self, id: usize) {
  76.         match self.chans[id].recv() {
  77.             Ok(item) => self.sorted_insert((item, id)),
  78.             Err(e) => {
  79.                 // debug!(message = "channel exhausted", ?id, ?e);
  80.                 self.remove_channel(id);
  81.             }
  82.         }
  83.     }
  84.  
  85.     fn receive_from_all(&mut self) {
  86.         for id in 0..self.chans.len() {
  87.             self.receive_from(id);
  88.         }
  89.     }
  90.  
  91.     fn sorted_insert(&mut self, item: (T, usize)) {
  92.         let ind = match self.head_items.binary_search(&item) {
  93.             Ok(_id) => unreachable!(), // exact match exists
  94.             Err(id) => id,             // insert location to maintain sort order
  95.         };
  96.  
  97.         self.head_items.insert(ind, item);
  98.     }
  99.  
  100.     fn remove_channel(&mut self, id: usize) {
  101.         let _ = self.chans.swap_remove(id);
  102.         let (_, new_id) = self.head_items.swap_remove(id);
  103.         self.head_items[id].1 = new_id;
  104.     }
  105. }
  106.  
  107. #[cfg(test)]
  108. mod tests {
  109.     use super::*;
  110.  
  111.     #[test]
  112.     fn happy_path() {
  113.         let (s1, r1) = crossbeam::channel::unbounded();
  114.         let (s2, r2) = crossbeam::channel::unbounded();
  115.  
  116.         let mut lines = ["hi", "okay", "abc"];
  117.         lines.sort();
  118.  
  119.         for line in lines.iter() {
  120.             s1.send(line.to_string());
  121.         }
  122.  
  123.         let mut lines2 = ["bcd", "hoho", "zyz"];
  124.         lines.sort();
  125.  
  126.         for line in lines2.iter() {
  127.             s2.send(line.to_string());
  128.         }
  129.  
  130.         let mut m = MergedChannels::new(vec![r1, r2]);
  131.  
  132.         for item in m {
  133.             println!("{}", item);
  134.         }
  135.     }
  136. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top