Guest User

Untitled

a guest
Nov 24th, 2017
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.67 KB | None | 0 0
  1. #[macro_use]
  2. extern crate log;
  3. extern crate env_logger;
  4.  
  5. extern crate futures;
  6. extern crate tokio_io;
  7. extern crate tokio_core;
  8. extern crate tokio_proto;
  9. extern crate tokio_service;
  10. extern crate bytes;
  11.  
  12. use tokio_io::{AsyncRead, AsyncWrite};
  13. use tokio_io::codec;
  14. use tokio_proto::pipeline;
  15. use tokio_core::net;
  16. use tokio_core::reactor;
  17. use tokio_service::{Service, NewService};
  18. use futures::{future, Future};
  19. use bytes::{BufMut, BytesMut};
  20. use std::{io, str};
  21. use std::net::SocketAddr;
  22.  
  23.  
  24. pub struct Validate<T> {
  25. inner: T,
  26. }
  27.  
  28. impl<T> Validate<T> {
  29. pub fn new(inner: T)-> Validate<T> {
  30. Validate {inner: inner }
  31. }
  32. }
  33.  
  34. // validates correctness of requests
  35. impl<T> Service for Validate<T>
  36. where T: Service<Request=String, Response=String, Error=io::Error>,
  37. T::Future: 'static,
  38. {
  39. type Request = String;
  40. type Response = String;
  41. type Error = io::Error;
  42.  
  43. type Future = Box<Future<Item=String, Error=io::Error>>;
  44.  
  45. fn call(&self, req: String)->Self::Future {
  46. if req.chars().find(|&c| c == '\n').is_some() {
  47. let err = io::Error::new(io::ErrorKind::InvalidInput, "message contained new line");
  48. return Box::new(future::done(Err(err)))
  49. }
  50.  
  51. Box::new(self.inner
  52. .call(req)
  53. .and_then(|resp| {
  54. if resp.chars().find(|&c| c == '\n').is_some() {
  55. Err(io::Error::new(io::ErrorKind::InvalidInput, "message contained new line"))
  56. } else { Ok(resp) }
  57. })
  58. )
  59. }
  60. }
  61.  
  62.  
  63. impl<T> NewService for Validate<T>
  64. where T: NewService<Request=String, Response=String, Error=io::Error>,
  65. <T::Instance as Service>::Future: 'static,
  66. {
  67. type Request = String;
  68. type Response = String;
  69. type Error = io::Error;
  70. type Instance = Validate<T::Instance>;
  71.  
  72. fn new_service(&self)-> io::Result<Self::Instance> {
  73. debug!("<Validate as NewService>::new_service");
  74. let inner = self.inner.new_service()?;
  75. Ok(Validate{inner: inner})
  76. }
  77. }
  78.  
  79. pub struct LineCodec;
  80.  
  81. // server decode the received data at once
  82. impl codec::Decoder for LineCodec {
  83. type Item = String;
  84. type Error = io::Error;
  85.  
  86. fn decode(&mut self, buf: &mut BytesMut)-> Result<Option<String>, io::Error> {
  87. if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
  88. let line = buf.split_to(n);
  89.  
  90. // remove the '\n' from buf
  91. buf.split_to(1);
  92.  
  93. return match str::from_utf8(&line.as_ref()) {
  94. Ok(s) => Ok(Some(s.to_string())),
  95. Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
  96. }
  97. }
  98. debug!("call decode({:?}) => complete", buf);
  99. Ok(None)
  100. }
  101. }
  102.  
  103. impl codec::Encoder for LineCodec {
  104. type Item = String;
  105. type Error = io::Error;
  106.  
  107. fn encode(&mut self, msg: String, buf: &mut BytesMut)-> io::Result<()> {
  108. buf.reserve(msg.len() + 1);
  109.  
  110. buf.extend(msg.as_bytes());
  111. buf.put_u8(b'\n');
  112. debug!("call encode => complete({:?})", buf);
  113. Ok(())
  114. }
  115. }
  116.  
  117. struct LineProto;
  118.  
  119. impl<T: AsyncRead+AsyncWrite + 'static> pipeline::ClientProto<T> for LineProto {
  120. type Request = String;
  121. type Response = String;
  122.  
  123. type Transport = codec::Framed<T, LineCodec>;
  124. type BindTransport = Result<Self::Transport, io::Error>;
  125.  
  126. fn bind_transport(&self, io: T)-> Self::BindTransport {
  127. debug!("<LineProto as ClientProto>::bind_transport");
  128. Ok(io.framed(LineCodec))
  129. }
  130. }
  131.  
  132. impl<T: AsyncRead + AsyncWrite + 'static> pipeline::ServerProto<T> for LineProto {
  133. type Request = String;
  134. type Response = String;
  135.  
  136. type Transport = codec::Framed<T, LineCodec>;
  137. type BindTransport = Result<Self::Transport, io::Error>;
  138.  
  139. fn bind_transport(&self, io: T)-> Self::BindTransport {
  140. debug!("<LineProto as ServerProto>::bind_transport");
  141. Ok(io.framed(LineCodec))
  142. }
  143. }
  144.  
  145. pub struct Client {
  146. inner: Validate<pipeline::ClientService<net::TcpStream, LineProto>>,
  147. }
  148.  
  149. impl Client {
  150. pub fn connect(addr: &SocketAddr, handle: &reactor::Handle)-> Box<Future<Item=Client, Error=io::Error>> {
  151. let ret =
  152. tokio_proto::TcpClient::new(LineProto)
  153. .connect(addr, handle)
  154. .map(|client_service| {
  155. let validate = Validate {inner: client_service};
  156. Client {inner: validate}
  157. });
  158. Box::new(ret)
  159. }
  160. }
  161.  
  162. impl Service for Client {
  163. type Request = String;
  164. type Response = String;
  165. type Error = io::Error;
  166.  
  167. type Future = Box<Future<Item=String, Error=io::Error>>;
  168.  
  169. fn call(&self, req: String)-> Self::Future {
  170. debug!("<Client as Service>::call({:?})", req);
  171. self.inner.call(req)
  172. }
  173. }
  174.  
  175.  
  176. pub struct ExampleService;
  177.  
  178.  
  179. impl Service for ExampleService {
  180. type Request = String;
  181. type Response = String;
  182. type Error = io::Error;
  183. type Future = future::FutureResult<Self::Response, Self::Error>;
  184.  
  185. // echo service
  186. fn call(&self, req: String) -> Self::Future {
  187. debug!("<ExampleService as Service>::call({:?})", req);
  188. debug!("** SERVER: {:?}", req);
  189. future::ok(req)
  190. }
  191. }
  192.  
  193. pub struct ExampleServer;
  194.  
  195. impl NewService for ExampleServer {
  196. type Request = String;
  197. type Response = String;
  198. type Error = io::Error;
  199. type Instance = ExampleService;
  200.  
  201. fn new_service(&self)->io::Result<Self::Instance> {
  202. debug!("start new_service");
  203. Ok(ExampleService)
  204. }
  205. }
  206.  
  207.  
  208. pub fn serve<T>(addr: SocketAddr, new_service: T)
  209. where T: NewService<Request=String, Response=String, Error=io::Error> + Send + Sync + 'static,
  210. {
  211. let new_service = Validate{inner: new_service};
  212.  
  213. tokio_proto::TcpServer::new(LineProto, addr)
  214. .serve(new_service);
  215. }
  216.  
  217. ///RUST_LOG="simple_line=debug" cargo run --bin simple_line
  218. fn main() {
  219. env_logger::init().unwrap();
  220. use std::thread;
  221. use std::time::Duration;
  222. let mut core = tokio_core::reactor::Core::new().unwrap();
  223.  
  224. let addr = "127.0.0.1:8080".parse().unwrap();
  225.  
  226. // thread::spawn(move || {serve(addr, Ok(ExampleService)}) instead.
  227. thread::spawn(move || {
  228. debug!("start spawn server");
  229. serve(addr, ExampleServer)
  230. });
  231.  
  232. // need to wait for the server to connect
  233. thread::sleep(Duration::from_millis(100));
  234.  
  235. let handle = core.handle();
  236.  
  237.  
  238. /*
  239. DEBUG:simple_line: start spawn server
  240. DEBUG:simple_line: start core.run
  241. DEBUG:simple_line: <LineProto as ClientProto>::bind_transport
  242. DEBUG:simple_line: <Validate as NewService>::new_service
  243. DEBUG:simple_line: start new_service
  244. DEBUG:simple_line: <LineProto as ServerProto>::bind_transport
  245. DEBUG:simple_line: <Client as Service>::call("Hello world")
  246. DEBUG:simple_line: call encode => complete(b"Hello\x20world\n")
  247. DEBUG:simple_line: <ExampleService as Service>::call("Hello world")
  248. DEBUG:simple_line: ** SERVER: "Hello world"
  249. DEBUG:simple_line: call decode(b"") => complete
  250. DEBUG:simple_line: call encode => complete(b"Hello\x20world\n")
  251. DEBUG:simple_line: call decode(b"") => complete
  252. DEBUG:simple_line: ** SERVER=>CLIENT: "Hello world"
  253.  
  254. Process finished with exit code 0
  255. */
  256.  
  257. // Runs a future until completion, driving the event loop while we're otherwise waiting for the future to complete.
  258. // This function will begin executing the event loop and will finish once the provided future is resolved.
  259. debug!("start core.run");
  260. core.run(
  261. Client::connect(&addr, &handle)
  262. .and_then(|client| {
  263. client
  264. .call("Hello world".to_string())
  265. .and_then(move |response| {
  266. debug!("** SERVER=>CLIENT: {:?}", response);
  267. Ok(())
  268. })
  269. })
  270. ).unwrap();
  271. }
Add Comment
Please, Sign In to add comment