Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // #![allow(unused)]
- use crossbeam::channel::{Receiver, Sender};
- /// Representation of a merged set of channels as an iterator
- ///
- /// Depends upon the assumption that all data in chans is already sorted.
- ///
- /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
- ///
- /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
- ///
- /// Once we have as many head_items as chans we pop the head and save the index that the item came
- /// from. On the next iteration we wait on that channel before repeating the insert and pop.
- ///
- /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
- /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
- /// channel.
- ///
- /// Start yielding only None when chans is empty
- pub struct MergedChannels<T> {
- /// Set of channels to merge input from
- chans: Vec<Receiver<T>>,
- /// Sorted list of head items already grabbed from other channels and the index of that channel
- /// in chans
- head_items: Vec<(T, usize)>,
- /// the index of the source chan of the previously yielded head_item
- last_picked: Option<usize>,
- }
- impl<T> Iterator for MergedChannels<T>
- where
- T: std::cmp::Ord,
- {
- type Item = T;
- fn next(&mut self) -> Option<Self::Item> {
- if self.chans.is_empty() {
- return None;
- }
- if let Some(id) = self.last_picked {
- self.receive_from(id);
- } else {
- self.receive_from_all();
- }
- Some(self.get_next_head_item())
- }
- }
- impl<T> MergedChannels<T>
- where
- T: std::cmp::Ord,
- {
- pub fn new(chans: Vec<Receiver<T>>) -> Self {
- Self {
- chans,
- head_items: vec![],
- last_picked: None,
- }
- }
- fn get_next_head_item(&mut self) -> T {
- assert!(!self.head_items.is_empty());
- let (item, last_picked) = self.head_items.pop().unwrap();
- self.last_picked = Some(last_picked);
- item
- }
- fn receive_from(&mut self, id: usize) {
- match self.chans[id].recv() {
- Ok(item) => self.sorted_insert((item, id)),
- Err(e) => {
- // debug!(message = "channel exhausted", ?id, ?e);
- self.remove_channel(id);
- }
- }
- }
- fn receive_from_all(&mut self) {
- for id in 0..self.chans.len() {
- self.receive_from(id);
- }
- }
- fn sorted_insert(&mut self, item: (T, usize)) {
- let ind = match self.head_items.binary_search(&item) {
- Ok(_id) => unreachable!(), // exact match exists
- Err(id) => id, // insert location to maintain sort order
- };
- self.head_items.insert(ind, item);
- }
- fn remove_channel(&mut self, id: usize) {
- let _ = self.chans.swap_remove(id);
- let (_, new_id) = self.head_items.swap_remove(id);
- self.head_items[id].1 = new_id;
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- #[test]
- fn happy_path() {
- let (s1, r1) = crossbeam::channel::unbounded();
- let (s2, r2) = crossbeam::channel::unbounded();
- let mut lines = ["hi", "okay", "abc"];
- lines.sort();
- for line in lines.iter() {
- s1.send(line.to_string());
- }
- let mut lines2 = ["bcd", "hoho", "zyz"];
- lines.sort();
- for line in lines2.iter() {
- s2.send(line.to_string());
- }
- let mut m = MergedChannels::new(vec![r1, r2]);
- for item in m {
- println!("{}", item);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement