Guest User

Untitled

a guest
Aug 19th, 2018
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.23 KB | None | 0 0
  1. extern crate futures; // 0.1.23
  2. extern crate rand;
  3. extern crate tokio; // 0.1.7 // 0.5.4
  4.  
  5. use futures::sync::mpsc::channel;
  6. use futures::{future, stream, Future, Sink, Stream};
  7. use rand::{thread_rng, Rng};
  8. use std::thread::sleep;
  9.  
  10. use std::time::{Duration, SystemTime};
  11.  
  12. pub fn select_n<S>(
  13. streams: S,
  14. buffer: usize,
  15. ) -> impl Stream<Item = <S::Item as Stream>::Item, Error = <S::Item as Stream>::Error>
  16. where
  17. S: IntoIterator,
  18. S::Item: Stream + Send + 'static,
  19. <S::Item as Stream>::Item: Send + 'static,
  20. <S::Item as Stream>::Error: Send + 'static,
  21. {
  22. future::lazy(move || {
  23. let (sender, receiver) = channel(buffer);
  24. for stream in streams {
  25. let sender = sender.clone();
  26. tokio::spawn(
  27. sender
  28. .sink_map_err(|sink_err| eprintln!("Can't send data: {}", sink_err))
  29. .send_all(stream.then(|res| Ok::<_, ()>(res)))
  30. .map(|(_sink, _stream)| ()),
  31. );
  32. }
  33. Ok(receiver.then(|wrapped_res| match wrapped_res {
  34. Ok(res) => res,
  35. Err(()) => panic!("Received error"),
  36. }))
  37. }).flatten_stream()
  38. }
  39.  
  40. fn hard_work(val: u16) -> u16 {
  41. let mut rng = thread_rng();
  42. let sleep_for = Duration::from_millis(rng.gen_range(100, 500));
  43. let modifier: u16 = rng.gen();
  44. sleep(sleep_for);
  45. val + modifier
  46. }
  47.  
  48. fn heavy_stream() -> Box<Stream<Item = u16, Error = u16> + Send> {
  49. Box::new(stream::repeat(0).take(5).map(hard_work))
  50. }
  51.  
  52. fn very_parallel_wow() {
  53. let processing = select_n(vec![heavy_stream(), heavy_stream()], 1)
  54. .then(|res| Ok::<_, ()>(res))
  55. .for_each(|_res| {
  56. //println!("Received {:?}", res);
  57. Ok(())
  58. });
  59. tokio::run(processing);
  60. }
  61.  
  62. fn not_so_parallel() {
  63. let processing = heavy_stream()
  64. .select(heavy_stream())
  65. .then(|res| Ok::<_, ()>(res))
  66. .for_each(|_res| {
  67. //println!("Received {:?}", res);
  68. Ok(())
  69. });
  70. tokio::run(processing);
  71. }
  72.  
  73. fn main() {
  74. let before = SystemTime::now();
  75. very_parallel_wow();
  76. println!("parallel: {:?}", before.elapsed());
  77.  
  78. let before = SystemTime::now();
  79. not_so_parallel();
  80. println!("not parallel: {:?}", before.elapsed());
  81. }
Add Comment
Please, Sign In to add comment