Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate futures; // 0.1.23
- extern crate rand;
- extern crate tokio; // 0.1.7 // 0.5.4
- use futures::sync::mpsc::channel;
- use futures::{future, stream, Future, Sink, Stream};
- use rand::{thread_rng, Rng};
- use std::thread::sleep;
- use std::time::{Duration, SystemTime};
- pub fn select_n<S>(
- streams: S,
- buffer: usize,
- ) -> impl Stream<Item = <S::Item as Stream>::Item, Error = <S::Item as Stream>::Error>
- where
- S: IntoIterator,
- S::Item: Stream + Send + 'static,
- <S::Item as Stream>::Item: Send + 'static,
- <S::Item as Stream>::Error: Send + 'static,
- {
- future::lazy(move || {
- let (sender, receiver) = channel(buffer);
- for stream in streams {
- let sender = sender.clone();
- tokio::spawn(
- sender
- .sink_map_err(|sink_err| eprintln!("Can't send data: {}", sink_err))
- .send_all(stream.then(|res| Ok::<_, ()>(res)))
- .map(|(_sink, _stream)| ()),
- );
- }
- Ok(receiver.then(|wrapped_res| match wrapped_res {
- Ok(res) => res,
- Err(()) => panic!("Received error"),
- }))
- }).flatten_stream()
- }
- fn hard_work(val: u16) -> u16 {
- let mut rng = thread_rng();
- let sleep_for = Duration::from_millis(rng.gen_range(100, 500));
- let modifier: u16 = rng.gen();
- sleep(sleep_for);
- val + modifier
- }
- fn heavy_stream() -> Box<Stream<Item = u16, Error = u16> + Send> {
- Box::new(stream::repeat(0).take(5).map(hard_work))
- }
- fn very_parallel_wow() {
- let processing = select_n(vec![heavy_stream(), heavy_stream()], 1)
- .then(|res| Ok::<_, ()>(res))
- .for_each(|_res| {
- //println!("Received {:?}", res);
- Ok(())
- });
- tokio::run(processing);
- }
- fn not_so_parallel() {
- let processing = heavy_stream()
- .select(heavy_stream())
- .then(|res| Ok::<_, ()>(res))
- .for_each(|_res| {
- //println!("Received {:?}", res);
- Ok(())
- });
- tokio::run(processing);
- }
- fn main() {
- let before = SystemTime::now();
- very_parallel_wow();
- println!("parallel: {:?}", before.elapsed());
- let before = SystemTime::now();
- not_so_parallel();
- println!("not parallel: {:?}", before.elapsed());
- }
Add Comment
Please, Sign In to add comment