Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate futures;
- extern crate tokio;
- use futures::sync::mpsc;
- use futures::Future;
- use futures::Stream;
- use futures::Sink;
- use tokio::io;
- use tokio::net::TcpListener;
- use std::collections::HashMap;
- use std::str;
- use std::string::String;
- use std::sync::Arc;
- use std::sync::Mutex;
- fn main() {
- let mut channels: HashMap<i32, mpsc::Sender<String>> = HashMap::new();
- let mut rt = tokio::runtime::Runtime::new().unwrap();
- for i in 0..3 {
- let (tx, rx) = mpsc::channel::<String>(10);
- channels.insert(i, tx);
- let rec_future = rx.for_each(|rec| {
- println!("Received in a worker: {:?}", rec);
- Ok(())
- });
- rt.spawn(rec_future);
- };
- let shared_channels = Arc::new(Mutex::new(channels));
- let addr = "0.0.0.0:6142".parse().unwrap();
- let listener = TcpListener::bind(&addr).unwrap();
- let server = listener
- .incoming()
- .map(move |socket| {
- let shared_channels_local = Arc::clone(&shared_channels);
- println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
- io::read_exact(socket, [0_u8; 10])
- .map(|(_, buf)| buf)
- .map(|buf| {
- let rec_str = str::from_utf8(&buf).unwrap();
- println!("Received: {:?}", &rec_str);
- String::from(rec_str)
- })
- .map( move |data| {
- let local_channels = shared_channels_local.lock().unwrap();
- let x = local_channels.get(&1).unwrap();
- println!("Start sending!");
- x.clone().send(data)
- })
- })
- .buffer_unordered(20)
- .for_each(|_| Ok(()))
- .map_err(|err| {
- println!("accept error = {:?}", err);
- });
- rt.spawn(server);
- rt.shutdown_on_idle().wait().unwrap();
- }
Add Comment
Please, Sign In to add comment