Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //! A chat server that broadcasts a message to all connections.
- //!
- //! This is a line-based server which accepts connections, reads lines from
- //! those connections, and broadcasts the lines to all other connected clients.
- //!
- //! This example is similar to chat.rs, but uses combinators and a much more
- //! functional style.
- //!
- //! You can test this out by running:
- //!
- //! cargo run --example chat
- //!
- //! And then in another window run:
- //!
- //! cargo run --example connect 127.0.0.1:8080
- //!
- //! You can run the second command in multiple windows and then chat between the
- //! two, seeing the messages from the other client as they're received. For all
- //! connected clients they'll all join the same room and see everyone else's
- //! messages.
- //#![deny(warnings, rust_2018_idioms)]
- use futures;
- use std::collections::HashMap;
- use std::env;
- use std::io::BufReader;
- use std::iter;
- use std::sync::{Arc, Mutex};
- use tokio;
- use tokio::io;
- use tokio::net::TcpListener;
- use tokio::prelude::*;
- use tokio::codec::{Framed, LinesCodec};
- fn main() -> Result<(), Box<dyn std::error::Error>> {
- // Create the TCP listener we'll accept connections on.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse()?;
- let socket = TcpListener::bind(&addr)?;
- println!("Listening on: {}", addr);
- // This is running on the Tokio runtime, so it will be multi-threaded. The
- // `Arc<Mutex<...>>` allows state to be shared across the threads.
- let connections = Arc::new(Mutex::new(HashMap::new()));
- // The server task asynchronously iterates over and processes each incoming
- // connection.
- let srv = socket
- .incoming()
- .map_err(|e| {
- println!("failed to accept socket; error = {:?}", e);
- e
- })
- .for_each(move |stream| {
- // The client's socket address
- let addr = stream.peer_addr()?;
- println!("New Connection: {}", addr);
- // Split the TcpStream into two separate handles. One handle for reading
- // and one handle for writing. This lets us use separate tasks for
- // reading and writing.
- let (writer, reader) = Framed::new(stream, LinesCodec::new()).split();
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.lock().unwrap().insert(addr, tx);
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_clone = connections.clone();
- let socket_reader = reader.for_each(move |line| {
- println!("{}: {:?}", addr, line);
- let mut conns = connections_clone.lock().unwrap();
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns
- .iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, line)).unwrap();
- }
- Ok(())
- });
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = writer.send_all(rx.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))); // this map_err is nonsense because rx has the error type () when actually it wants to say it can't ever fail
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let socket_writer = socket_writer.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
- // Spawn a task to process the connection
- tokio::spawn(connection.then(move |_| {
- connections.lock().unwrap().remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- }));
- Ok(())
- })
- .map_err(|err| println!("error occurred: {:?}", err));
- // execute server
- tokio::run(srv);
- Ok(())
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement