Guest User

Untitled

a guest
Jan 18th, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.10 KB | None | 0 0
  1. extern crate getopts;
  2. extern crate tokio;
  3. extern crate futures_timer;
  4. extern crate futures;
  5. #[macro_use]
  6. extern crate tokio_io;
  7.  
  8. use std::env;
  9. use std::net::SocketAddr;
  10. use std::time::Duration;
  11. use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
  12. use std::sync::atomic::Ordering::SeqCst;
  13. use std::thread;
  14. use std::io::{self, Write, Read};
  15.  
  16. use futures::executor::CurrentThread;
  17. use futures::prelude::*;
  18. use futures::stream;
  19. use futures_timer::{Sleep, Interval};
  20. use getopts::Options;
  21. use tokio::net::{TcpListener, TcpStream};
  22.  
  23. struct Config {
  24. addr: SocketAddr,
  25. print_interval: Duration,
  26. io_delay: Duration,
  27. concurrency: usize,
  28. num_connections: usize,
  29. }
  30.  
  31. static CLIENTS: AtomicUsize = ATOMIC_USIZE_INIT;
  32. static ERRORS: AtomicUsize = ATOMIC_USIZE_INIT;
  33. static BYTES_READ: AtomicUsize = ATOMIC_USIZE_INIT;
  34. static BYTES_WRITTEN: AtomicUsize = ATOMIC_USIZE_INIT;
  35.  
  36. fn main() {
  37. let args: Vec<String> = env::args().collect();
  38. let program = args[0].clone();
  39.  
  40. let mut opts = Options::new();
  41. opts.optflag("h", "help", "print this help menu");
  42. opts.optflag("", "server", "run the server");
  43. opts.reqopt("a", "addr", "address to bind/connect to", "ADDR");
  44. opts.optopt("i", "interval", "interval to print stats on (ms)", "MS");
  45. opts.optopt("d", "delay", "delay after connecting/accepting to do I/O (ms)", "MS");
  46. opts.optopt("c", "concurrency", "number of connections to send (client)", "CONNS");
  47. opts.optopt("n", "num-connections", "total number of connections to send (client)", "TOTAL");
  48. let matches = match opts.parse(&args[1..]) {
  49. Ok(m) => { m }
  50. Err(f) => panic!(f.to_string()),
  51. };
  52. if matches.opt_present("h") {
  53. return print_usage(&program, opts)
  54. }
  55. let addr = matches.opt_str("a").expect("must pass `-a` argument");
  56. let addr = addr.parse().expect("failed to parse `-a` as socket address");
  57.  
  58. let print_interval = matches.opt_str("i")
  59. .map(|s| s.parse().expect("failed to parse `-i` as a number"))
  60. .map(Duration::from_millis)
  61. .unwrap_or(Duration::from_secs(1));
  62.  
  63. let io_delay = matches.opt_str("d")
  64. .map(|s| s.parse().expect("failed to parse `-d` as a number"))
  65. .map(Duration::from_millis)
  66. .unwrap_or(Duration::from_millis(200));
  67.  
  68. let concurrency = matches.opt_str("c")
  69. .map(|s| s.parse().expect("failed to parse `-c` as a number"))
  70. .unwrap_or(1000);
  71.  
  72. let num_connections = matches.opt_str("n")
  73. .map(|s| s.parse().expect("failed to parse `-n` as a number"))
  74. .unwrap_or(10_000);
  75.  
  76. let config = Config {
  77. addr,
  78. print_interval,
  79. io_delay,
  80. concurrency,
  81. num_connections,
  82. };
  83.  
  84. let print = config.print_interval;
  85. thread::spawn(move || {
  86. let interval = Interval::new(print);
  87. let interval = stream::blocking(interval);
  88. let mut old_reads = 0;
  89. let mut old_writes = 0;
  90. for (i, _) in interval.enumerate() {
  91. let reads = BYTES_READ.load(SeqCst);
  92. let writes = BYTES_WRITTEN.load(SeqCst);
  93. print!("\
  94. {:<3} \
  95. clients: {:<6} \
  96. errors: {:<4} \
  97. reads: {:<6} \
  98. writes: {:<6} \
  99. reads/i: {:<4} \
  100. writes/i: {:<4} \
  101. \r\
  102. ",
  103. i,
  104. CLIENTS.load(SeqCst),
  105. ERRORS.load(SeqCst),
  106. reads,
  107. writes,
  108. reads - old_reads,
  109. writes - old_writes,
  110. );
  111. old_reads = reads;
  112. old_writes = writes;
  113. io::stdout().flush().unwrap();
  114. }
  115. });
  116.  
  117. if matches.opt_present("server") {
  118. server(&config);
  119. } else {
  120. client(&config);
  121. }
  122. }
  123.  
  124. fn print_usage(program: &str, opts: Options) {
  125. let brief = format!("Usage: {} [options]", program);
  126. print!("{}", opts.usage(&brief));
  127. }
  128.  
  129. fn server(config: &Config) {
  130. let listener = TcpListener::bind(&config.addr).expect("failed to bind listener");
  131.  
  132. println!("listening on {}", listener.local_addr().unwrap());
  133.  
  134. let delay = config.io_delay;
  135. let srv = listener.incoming()
  136. .map(|(s, _)| {
  137. CLIENTS.fetch_add(1, SeqCst);
  138. unlinger(&s);
  139. s
  140. })
  141. .for_each(move |socket| {
  142. let client = ReadByte(Some(socket))
  143. .and_then(move |(socket, byte)| {
  144. BYTES_READ.fetch_add(1, SeqCst);
  145. Sleep::new(delay)
  146. .and_then(move |_| WriteByte(Some(socket), byte))
  147. .map(|_| {
  148. BYTES_WRITTEN.fetch_add(1, SeqCst);
  149. })
  150. });
  151. CurrentThread::execute(client.then(|res| {
  152. CLIENTS.fetch_sub(1, SeqCst);
  153. if res.is_err() {
  154. ERRORS.fetch_add(1, SeqCst);
  155. }
  156. Ok(())
  157. }));
  158.  
  159. Ok(())
  160. });
  161.  
  162. CurrentThread::run(|_| {
  163. CurrentThread::execute(srv.map_err(|e| {
  164. panic!("listener error: {}", e);
  165. }));
  166. });
  167. }
  168.  
  169. fn client(config: &Config) {
  170. let addr = config.addr;
  171. let delay = config.io_delay;
  172. let clients = stream::iter_ok(0..config.num_connections)
  173. .map(move |_| {
  174. TcpStream::connect(&addr)
  175. .map(|s| {
  176. CLIENTS.fetch_add(1, SeqCst);
  177. unlinger(&s);
  178. s
  179. })
  180. .and_then(move |socket| Sleep::new(delay).map(|_| socket))
  181. .and_then(|s| WriteByte(Some(s), 1))
  182. .and_then(|s| {
  183. BYTES_WRITTEN.fetch_add(1, SeqCst);
  184. ReadByte(Some(s))
  185. })
  186. .map(|_| {
  187. BYTES_READ.fetch_add(1, SeqCst);
  188. })
  189. })
  190. .buffer_unordered(config.concurrency);
  191.  
  192. CurrentThread::run(|_| {
  193. CurrentThread::execute(clients.then(|res| {
  194. CLIENTS.fetch_sub(1, SeqCst);
  195. if res.is_err() {
  196. ERRORS.fetch_add(1, SeqCst);
  197. }
  198. Ok(())
  199. }).for_each(|_| Ok(())));
  200. });
  201. }
  202.  
  203. struct ReadByte(Option<TcpStream>);
  204.  
  205. impl Future for ReadByte {
  206. type Item = (TcpStream, u8);
  207. type Error = io::Error;
  208.  
  209. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  210. let mut buf = [0];
  211. let n = try_nb!(self.0.as_mut().unwrap().read(&mut buf));
  212. if n == 0 {
  213. Err(io::Error::new(io::ErrorKind::Other, "unexpected eof"))
  214. } else {
  215. Ok(Async::Ready((self.0.take().unwrap(), buf[0])))
  216. }
  217. }
  218. }
  219.  
  220. struct WriteByte(Option<TcpStream>, u8);
  221.  
  222. impl Future for WriteByte {
  223. type Item = TcpStream;
  224. type Error = io::Error;
  225.  
  226. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  227. let buf = [self.1];
  228. let n = try_nb!(self.0.as_mut().unwrap().write(&buf));
  229. if n == 0 {
  230. Err(io::Error::new(io::ErrorKind::Other, "unexpected eof"))
  231. } else {
  232. Ok(Async::Ready(self.0.take().unwrap()))
  233. }
  234. }
  235. }
  236.  
  237. fn unlinger(s: &TcpStream) {
  238. s.set_linger(Some(Duration::from_secs(0))).unwrap();
  239. }
Add Comment
Please, Sign In to add comment