Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.60 KB | None | 0 0
  1. //! A chat server that broadcasts a message to all connections.
  2. //!
  3. //! This is a line-based server which accepts connections, reads lines from
  4. //! those connections, and broadcasts the lines to all other connected clients.
  5. //!
  6. //! This example is similar to chat.rs, but uses combinators and a much more
  7. //! functional style.
  8. //!
  9. //! You can test this out by running:
  10. //!
  11. //! cargo run --example chat
  12. //!
  13. //! And then in another window run:
  14. //!
  15. //! cargo run --example connect 127.0.0.1:8080
  16. //!
  17. //! You can run the second command in multiple windows and then chat between the
  18. //! two, seeing the messages from the other client as they're received. For all
  19. //! connected clients they'll all join the same room and see everyone else's
  20. //! messages.
  21.  
  22. //#![deny(warnings, rust_2018_idioms)]
  23.  
  24. use futures;
  25. use std::collections::HashMap;
  26. use std::env;
  27. use std::io::BufReader;
  28. use std::iter;
  29. use std::sync::{Arc, Mutex};
  30. use tokio;
  31. use tokio::io;
  32. use tokio::net::TcpListener;
  33. use tokio::prelude::*;
  34. use tokio::codec::{Framed, LinesCodec};
  35.  
  36. fn main() -> Result<(), Box<dyn std::error::Error>> {
  37. // Create the TCP listener we'll accept connections on.
  38. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
  39. let addr = addr.parse()?;
  40.  
  41. let socket = TcpListener::bind(&addr)?;
  42. println!("Listening on: {}", addr);
  43.  
  44. // This is running on the Tokio runtime, so it will be multi-threaded. The
  45. // `Arc<Mutex<...>>` allows state to be shared across the threads.
  46. let connections = Arc::new(Mutex::new(HashMap::new()));
  47.  
  48. // The server task asynchronously iterates over and processes each incoming
  49. // connection.
  50. let srv = socket
  51. .incoming()
  52. .map_err(|e| {
  53. println!("failed to accept socket; error = {:?}", e);
  54. e
  55. })
  56. .for_each(move |stream| {
  57. // The client's socket address
  58. let addr = stream.peer_addr()?;
  59.  
  60. println!("New Connection: {}", addr);
  61.  
  62. // Split the TcpStream into two separate handles. One handle for reading
  63. // and one handle for writing. This lets us use separate tasks for
  64. // reading and writing.
  65. let (writer, reader) = Framed::new(stream, LinesCodec::new()).split();
  66.  
  67. // Create a channel for our stream, which other sockets will use to
  68. // send us messages. Then register our address with the stream to send
  69. // data to us.
  70. let (tx, rx) = futures::sync::mpsc::unbounded();
  71. connections.lock().unwrap().insert(addr, tx);
  72.  
  73. // Define here what we do for the actual I/O. That is, read a bunch of
  74. // lines from the socket and dispatch them while we also write any lines
  75. // from other sockets.
  76. let connections_clone = connections.clone();
  77.  
  78. let socket_reader = reader.for_each(move |line| {
  79. println!("{}: {:?}", addr, line);
  80. let mut conns = connections_clone.lock().unwrap();
  81.  
  82. // For each open connection except the sender, send the
  83. // string via the channel.
  84. let iter = conns
  85. .iter_mut()
  86. .filter(|&(&k, _)| k != addr)
  87. .map(|(_, v)| v);
  88. for tx in iter {
  89. tx.unbounded_send(format!("{}: {}", addr, line)).unwrap();
  90. }
  91. Ok(())
  92. });
  93.  
  94. // Whenever we receive a string on the Receiver, we write it to
  95. // `WriteHalf<TcpStream>`.
  96. let socket_writer = writer.send_all(rx.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))); // this map_err is nonsense because rx has the error type () when actually it wants to say it can't ever fail
  97.  
  98. // Now that we've got futures representing each half of the socket, we
  99. // use the `select` combinator to wait for either half to be done to
  100. // tear down the other. Then we spawn off the result.
  101. let connections = connections.clone();
  102. let socket_reader = socket_reader.map_err(|_| ());
  103. let socket_writer = socket_writer.map_err(|_| ());
  104. let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
  105.  
  106. // Spawn a task to process the connection
  107. tokio::spawn(connection.then(move |_| {
  108. connections.lock().unwrap().remove(&addr);
  109. println!("Connection {} closed.", addr);
  110. Ok(())
  111. }));
  112.  
  113. Ok(())
  114. })
  115. .map_err(|err| println!("error occurred: {:?}", err));
  116.  
  117. // execute server
  118. tokio::run(srv);
  119. Ok(())
  120. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement