Advertisement
Guest User

Untitled

a guest
Jul 21st, 2019
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.58 KB | None | 0 0
  1. // #![allow(unused)]
  2. use crossbeam::channel::{Receiver, Sender};
  3.  
  4. /// Representation of a merged set of channels as an iterator
  5. ///
  6. /// Depends upon the assumption that all data in chans is already sorted.
  7. ///
  8. /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
  9. ///
  10. /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
  11. ///
  12. /// Once we have as many head_items as chans we pop the head and save the index that the item came
  13. /// from. On the next iteration we wait on that channel before repeating the insert and pop.
  14. ///
  15. /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
  16. /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
  17. /// channel.
  18. ///
  19. /// Start yielding only None when chans is empty
  20. pub struct MergedChannels<T> {
  21. /// Set of channels to merge input from
  22. chans: Vec<Receiver<T>>,
  23. /// Sorted list of head items already grabbed from other channels and the index of that channel
  24. /// in chans
  25. head_items: Vec<(T, usize)>,
  26. /// the index of the source chan of the previously yielded head_item
  27. last_picked: Option<usize>,
  28. }
  29.  
  30. impl<T> Iterator for MergedChannels<T>
  31. where
  32. T: std::cmp::Ord,
  33. {
  34. type Item = T;
  35.  
  36. fn next(&mut self) -> Option<Self::Item> {
  37. if self.chans.is_empty() {
  38. return None;
  39. }
  40.  
  41. if let Some(id) = self.last_picked {
  42. self.receive_from(id);
  43. } else {
  44. self.receive_from_all();
  45. }
  46.  
  47. Some(self.get_next_head_item())
  48. }
  49. }
  50.  
  51. impl<T> MergedChannels<T>
  52. where
  53. T: std::cmp::Ord,
  54. {
  55. pub fn new(chans: Vec<Receiver<T>>) -> Self {
  56. Self {
  57. chans,
  58. head_items: vec![],
  59. last_picked: None,
  60. }
  61. }
  62.  
  63. fn get_next_head_item(&mut self) -> T {
  64. assert!(!self.head_items.is_empty());
  65.  
  66. let (item, last_picked) = self.head_items.pop().unwrap();
  67.  
  68. self.last_picked = Some(last_picked);
  69.  
  70. item
  71. }
  72.  
  73. fn receive_from(&mut self, id: usize) {
  74. match self.chans[id].recv() {
  75. Ok(item) => self.sorted_insert((item, id)),
  76. Err(e) => {
  77. // debug!(message = "channel exhausted", ?id, ?e);
  78. self.remove_channel(id);
  79. }
  80. }
  81. }
  82.  
  83. fn receive_from_all(&mut self) {
  84. for id in 0..self.chans.len() {
  85. self.receive_from(id);
  86. }
  87. }
  88.  
  89. fn sorted_insert(&mut self, item: (T, usize)) {
  90. let ind = match self.head_items.binary_search(&item) {
  91. Ok(_id) => unreachable!(), // exact match exists
  92. Err(id) => id, // insert location to maintain sort order
  93. };
  94.  
  95. self.head_items.insert(ind, item);
  96. }
  97.  
  98. fn remove_channel(&mut self, id: usize) {
  99. let _ = self.chans.swap_remove(id);
  100. let (_, new_id) = self.head_items.swap_remove(id);
  101. self.head_items[id].1 = new_id;
  102. }
  103. }
  104.  
  105. #[cfg(test)]
  106. mod tests {
  107. use super::*;
  108.  
  109. #[test]
  110. fn happy_path() {
  111. let (s1, r1) = crossbeam::channel::unbounded();
  112. let (s2, r2) = crossbeam::channel::unbounded();
  113.  
  114. let mut lines = ["hi", "okay", "abc"];
  115. lines.sort();
  116.  
  117. for line in lines.iter() {
  118. s1.send(line.to_string());
  119. }
  120.  
  121. let mut lines2 = ["bcd", "hoho", "zyz"];
  122. lines.sort();
  123.  
  124. for line in lines2.iter() {
  125. s2.send(line.to_string());
  126. }
  127.  
  128. let mut m = MergedChannels::new(vec![r1, r2]);
  129.  
  130. for item in m {
  131. println!("{}", item);
  132. }
  133. }
  134. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement