SHARE
TWEET

Untitled

a guest Jul 21st, 2019 91 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. //! Struct for merging multiple sorted channels into a single iterator
  2.  
  3. use crossbeam::channel::Receiver;
  4. use std::cmp::Ordering;
  5. use std::fmt::Debug;
  6. use tracing_proc_macros::trace as instrument;
  7.  
  8. /// Representation of a merged set of channels as an iterator
  9. ///
  10. /// Depends upon the assumption that all data in chans is already sorted.
  11. ///
  12. /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
  13. ///
  14. /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
  15. ///
  16. /// Once we have as many head_items as chans we pop the head and save the index that the item came
  17. /// from. On the next iteration we wait on that channel before repeating the insert and pop.
  18. ///
  19. /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
  20. /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
  21. /// channel.
  22. ///
  23. /// Start yielding only None when chans is empty
  24. #[derive(Debug)]
  25. pub struct MergedChannels<T> {
  26.     /// Set of channels to merge input from
  27.     chans: Vec<Receiver<T>>,
  28.     /// Sorted list of head items already grabbed from other channels and the index of that channel
  29.     /// in chans
  30.     head_items: Vec<(T, usize)>,
  31.     /// the index of the source chan of the previously yielded head_item
  32.     last_picked: Option<usize>,
  33. }
  34.  
  35. impl<T> Iterator for MergedChannels<T>
  36. where
  37.     T: Ord + Debug,
  38. {
  39.     type Item = T;
  40.  
  41.     #[instrument]
  42.     fn next(&mut self) -> Option<Self::Item> {
  43.         if self.chans.is_empty() {
  44.             return None;
  45.         }
  46.  
  47.         if let Some(id) = self.last_picked {
  48.             self.receive_from(id);
  49.         } else {
  50.             self.receive_from_all();
  51.         }
  52.  
  53.         self.get_next_head_item()
  54.     }
  55. }
  56.  
  57. impl<T> MergedChannels<T>
  58. where
  59.     T: Ord + Debug,
  60. {
  61.     /// Construct a merged channels
  62.     #[instrument]
  63.     pub fn new(chans: Vec<Receiver<T>>) -> Self {
  64.         Self {
  65.             chans,
  66.             head_items: vec![],
  67.             last_picked: None,
  68.         }
  69.     }
  70.  
  71.     /// pop the lowest item in the vec and save the id of the channel that item came from
  72.     #[instrument]
  73.     fn get_next_head_item(&mut self) -> Option<T> {
  74.         self.head_items.pop().map(|(item, last_picked)| {
  75.             self.last_picked = Some(last_picked);
  76.  
  77.             item
  78.         })
  79.     }
  80.  
  81.     /// Receive the next item from chan id and insert that into head_items
  82.     #[instrument]
  83.     fn receive_from(&mut self, id: usize) {
  84.         match self.chans[id].recv() {
  85.             Ok(item) => self.sorted_insert((item, id)),
  86.             Err(e) => {
  87.                 debug!(message = "channel exhausted", ?id, ?e);
  88.                 self.remove_channel(id);
  89.             }
  90.         }
  91.     }
  92.  
  93.     /// Receive one item for each channel to fill up head_items
  94.     #[instrument]
  95.     fn receive_from_all(&mut self) {
  96.         for id in 0..self.chans.len() {
  97.             self.receive_from(id);
  98.         }
  99.     }
  100.  
  101.     /// Insert item into head_items in sorted order
  102.     #[instrument]
  103.     fn sorted_insert(&mut self, item: (T, usize)) {
  104.         let ind = match self
  105.             .head_items
  106.             .binary_search_by(|probe| match probe.cmp(&item) {
  107.                 Ordering::Less => Ordering::Greater,
  108.                 Ordering::Greater => Ordering::Less,
  109.                 item => item,
  110.             }) {
  111.             Ok(_id) => unreachable!(), // exact match exists
  112.             Err(id) => id,             // insert location to maintain sort order
  113.         };
  114.  
  115.         self.head_items.insert(ind, item);
  116.     }
  117.  
  118.     /// Remove a channel that is exhausted from the set of channels and adjust the id of the
  119.     /// head_item that swapped with the removed channel to indicate its new location in the chans
  120.     /// vector
  121.     #[instrument]
  122.     fn remove_channel(&mut self, id: usize) {
  123.         trace!(message = "removing id", ?id, ?self);
  124.         let _ = self.chans.swap_remove(id);
  125.  
  126.         let old_id = self.chans.len();
  127.  
  128.         if let Some(dirty_head_item) = self.head_items.iter_mut().find(|item| item.1 == old_id) {
  129.             dirty_head_item.1 = id;
  130.         };
  131.  
  132.         trace!(message = "removed id", ?id, ?self);
  133.     }
  134. }
  135.  
  136. #[cfg(test)]
  137. mod tests {
  138.     use super::*;
  139.  
  140.     #[test]
  141.     fn happy_path() {
  142.         crate::init_script("info");
  143.  
  144.         let (s1, r1) = crossbeam::channel::unbounded();
  145.         let (s2, r2) = crossbeam::channel::unbounded();
  146.  
  147.         let mut lines = ["hi", "okay", "abc"];
  148.         lines.sort();
  149.  
  150.         for line in lines.iter() {
  151.             s1.send(line.to_string()).unwrap();
  152.         }
  153.  
  154.         drop(s1);
  155.  
  156.         let mut lines2 = ["bcd", "hoho", "zyz"];
  157.         lines2.sort();
  158.  
  159.         for line in lines2.iter() {
  160.             s2.send(line.to_string()).unwrap();
  161.         }
  162.  
  163.         drop(s2);
  164.  
  165.         let m = MergedChannels::new(vec![r1, r2]);
  166.  
  167.         for (id, item) in m.enumerate() {
  168.             info!(%item, %id);
  169.         }
  170.     }
  171. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top