Advertisement
Guest User

Untitled

a guest
Jul 21st, 2019
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.59 KB | None | 0 0
  1. // #![allow(unused)]
  2.  
  3. #![deny(unused_qualifications)]
  4. use crossbeam::channel::{Receiver, Sender};
  5.  
  6. /// Representation of a merged set of channels as an iterator
  7. ///
  8. /// Depends upon the assumption that all data in chans is already sorted.
  9. ///
  10. /// Waits on chans at start of each next call to ensure that we have one head_item per channel.
  11. ///
  12. /// Upon reading each head_item they are inserted into head_items using a binary_search and insert.
  13. ///
  14. /// Once we have as many head_items as chans we pop the head and save the index that the item came
  15. /// from. On the next iteration we wait on that channel before repeating the insert and pop.
  16. ///
  17. /// Once we exhaust a channel we swap the exhausted channel with the last one, pop it, and find the
  18. /// highest ID in head_items and replace it with the new ID which was assigned to the exhausted
  19. /// channel.
  20. ///
  21. /// Start yielding only None when chans is empty
  22. pub struct MergedChannels<T> {
  23. /// Set of channels to merge input from
  24. chans: Vec<Receiver<T>>,
  25. /// Sorted list of head items already grabbed from other channels and the index of that channel
  26. /// in chans
  27. head_items: Vec<(T, usize)>,
  28. /// the index of the source chan of the previously yielded head_item
  29. last_picked: Option<usize>,
  30. }
  31.  
  32. impl<T> Iterator for MergedChannels<T>
  33. where
  34. T: Ord,
  35. {
  36. type Item = T;
  37.  
  38. fn next(&mut self) -> Option<Self::Item> {
  39. if self.chans.is_empty() {
  40. return None;
  41. }
  42.  
  43. if let Some(id) = self.last_picked {
  44. self.receive_from(id);
  45. } else {
  46. self.receive_from_all();
  47. }
  48.  
  49. Some(self.get_next_head_item())
  50. }
  51. }
  52.  
  53. impl<T> MergedChannels<T>
  54. where
  55. T: Ord,
  56. {
  57. pub fn new(chans: Vec<Receiver<T>>) -> Self {
  58. Self {
  59. chans,
  60. head_items: vec![],
  61. last_picked: None,
  62. }
  63. }
  64.  
  65. fn get_next_head_item(&mut self) -> T {
  66. assert!(!self.head_items.is_empty());
  67.  
  68. let (item, last_picked) = self.head_items.pop().unwrap();
  69.  
  70. self.last_picked = Some(last_picked);
  71.  
  72. item
  73. }
  74.  
  75. fn receive_from(&mut self, id: usize) {
  76. match self.chans[id].recv() {
  77. Ok(item) => self.sorted_insert((item, id)),
  78. Err(e) => {
  79. // debug!(message = "channel exhausted", ?id, ?e);
  80. self.remove_channel(id);
  81. }
  82. }
  83. }
  84.  
  85. fn receive_from_all(&mut self) {
  86. for id in 0..self.chans.len() {
  87. self.receive_from(id);
  88. }
  89. }
  90.  
  91. fn sorted_insert(&mut self, item: (T, usize)) {
  92. let ind = match self.head_items.binary_search(&item) {
  93. Ok(_id) => unreachable!(), // exact match exists
  94. Err(id) => id, // insert location to maintain sort order
  95. };
  96.  
  97. self.head_items.insert(ind, item);
  98. }
  99.  
  100. fn remove_channel(&mut self, id: usize) {
  101. let _ = self.chans.swap_remove(id);
  102. let (_, new_id) = self.head_items.swap_remove(id);
  103. self.head_items[id].1 = new_id;
  104. }
  105. }
  106.  
  107. #[cfg(test)]
  108. mod tests {
  109. use super::*;
  110.  
  111. #[test]
  112. fn happy_path() {
  113. let (s1, r1) = crossbeam::channel::unbounded();
  114. let (s2, r2) = crossbeam::channel::unbounded();
  115.  
  116. let mut lines = ["hi", "okay", "abc"];
  117. lines.sort();
  118.  
  119. for line in lines.iter() {
  120. s1.send(line.to_string());
  121. }
  122.  
  123. let mut lines2 = ["bcd", "hoho", "zyz"];
  124. lines.sort();
  125.  
  126. for line in lines2.iter() {
  127. s2.send(line.to_string());
  128. }
  129.  
  130. let mut m = MergedChannels::new(vec![r1, r2]);
  131.  
  132. for item in m {
  133. println!("{}", item);
  134. }
  135. }
  136. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement