daily pastebin goal
77%
SHARE
TWEET

Untitled

a guest Aug 19th, 2018 52 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. extern crate futures; // 0.1.23
  2. extern crate rand;
  3. extern crate tokio; // 0.1.7 // 0.5.4
  4.  
  5. use futures::sync::mpsc::channel;
  6. use futures::{future, stream, Future, Sink, Stream};
  7. use rand::{thread_rng, Rng};
  8. use std::thread::sleep;
  9.  
  10. use std::time::{Duration, SystemTime};
  11.  
  12. pub fn select_n<S>(
  13.     streams: S,
  14.     buffer: usize,
  15. ) -> impl Stream<Item = <S::Item as Stream>::Item, Error = <S::Item as Stream>::Error>
  16. where
  17.     S: IntoIterator,
  18.     S::Item: Stream + Send + 'static,
  19.     <S::Item as Stream>::Item: Send + 'static,
  20.     <S::Item as Stream>::Error: Send + 'static,
  21. {
  22.     future::lazy(move || {
  23.         let (sender, receiver) = channel(buffer);
  24.         for stream in streams {
  25.             let sender = sender.clone();
  26.             tokio::spawn(
  27.                 sender
  28.                     .sink_map_err(|sink_err| eprintln!("Can't send data: {}", sink_err))
  29.                     .send_all(stream.then(|res| Ok::<_, ()>(res)))
  30.                     .map(|(_sink, _stream)| ()),
  31.             );
  32.         }
  33.         Ok(receiver.then(|wrapped_res| match wrapped_res {
  34.             Ok(res) => res,
  35.             Err(()) => panic!("Received error"),
  36.         }))
  37.     }).flatten_stream()
  38. }
  39.  
  40. fn hard_work(val: u16) -> u16 {
  41.     let mut rng = thread_rng();
  42.     let sleep_for = Duration::from_millis(rng.gen_range(100, 500));
  43.     let modifier: u16 = rng.gen();
  44.     sleep(sleep_for);
  45.     val + modifier
  46. }
  47.  
  48. fn heavy_stream() -> Box<Stream<Item = u16, Error = u16> + Send> {
  49.     Box::new(stream::repeat(0).take(5).map(hard_work))
  50. }
  51.  
  52. fn very_parallel_wow() {
  53.     let processing = select_n(vec![heavy_stream(), heavy_stream()], 1)
  54.         .then(|res| Ok::<_, ()>(res))
  55.         .for_each(|_res| {
  56.             //println!("Received {:?}", res);
  57.             Ok(())
  58.         });
  59.     tokio::run(processing);
  60. }
  61.  
  62. fn not_so_parallel() {
  63.     let processing = heavy_stream()
  64.         .select(heavy_stream())
  65.         .then(|res| Ok::<_, ()>(res))
  66.         .for_each(|_res| {
  67.             //println!("Received {:?}", res);
  68.             Ok(())
  69.         });
  70.     tokio::run(processing);
  71. }
  72.  
  73. fn main() {
  74.     let before = SystemTime::now();
  75.     very_parallel_wow();
  76.     println!("parallel: {:?}", before.elapsed());
  77.  
  78.     let before = SystemTime::now();
  79.     not_so_parallel();
  80.     println!("not parallel: {:?}", before.elapsed());
  81. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top