Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #[macro_use]
- extern crate log;
- extern crate env_logger;
- extern crate futures;
- extern crate tokio_io;
- extern crate tokio_core;
- extern crate tokio_proto;
- extern crate tokio_service;
- extern crate bytes;
- use tokio_io::{AsyncRead, AsyncWrite};
- use tokio_io::codec;
- use tokio_proto::pipeline;
- use tokio_core::net;
- use tokio_core::reactor;
- use tokio_service::{Service, NewService};
- use futures::{future, Future};
- use bytes::{BufMut, BytesMut};
- use std::{io, str};
- use std::net::SocketAddr;
- pub struct Validate<T> {
- inner: T,
- }
- impl<T> Validate<T> {
- pub fn new(inner: T)-> Validate<T> {
- Validate {inner: inner }
- }
- }
- // validates correctness of requests
- impl<T> Service for Validate<T>
- where T: Service<Request=String, Response=String, Error=io::Error>,
- T::Future: 'static,
- {
- type Request = String;
- type Response = String;
- type Error = io::Error;
- type Future = Box<Future<Item=String, Error=io::Error>>;
- fn call(&self, req: String)->Self::Future {
- if req.chars().find(|&c| c == '\n').is_some() {
- let err = io::Error::new(io::ErrorKind::InvalidInput, "message contained new line");
- return Box::new(future::done(Err(err)))
- }
- Box::new(self.inner
- .call(req)
- .and_then(|resp| {
- if resp.chars().find(|&c| c == '\n').is_some() {
- Err(io::Error::new(io::ErrorKind::InvalidInput, "message contained new line"))
- } else { Ok(resp) }
- })
- )
- }
- }
- impl<T> NewService for Validate<T>
- where T: NewService<Request=String, Response=String, Error=io::Error>,
- <T::Instance as Service>::Future: 'static,
- {
- type Request = String;
- type Response = String;
- type Error = io::Error;
- type Instance = Validate<T::Instance>;
- fn new_service(&self)-> io::Result<Self::Instance> {
- debug!("<Validate as NewService>::new_service");
- let inner = self.inner.new_service()?;
- Ok(Validate{inner: inner})
- }
- }
- pub struct LineCodec;
- // server decode the received data at once
- impl codec::Decoder for LineCodec {
- type Item = String;
- type Error = io::Error;
- fn decode(&mut self, buf: &mut BytesMut)-> Result<Option<String>, io::Error> {
- if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
- let line = buf.split_to(n);
- // remove the '\n' from buf
- buf.split_to(1);
- return match str::from_utf8(&line.as_ref()) {
- Ok(s) => Ok(Some(s.to_string())),
- Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
- }
- }
- debug!("call decode({:?}) => complete", buf);
- Ok(None)
- }
- }
- impl codec::Encoder for LineCodec {
- type Item = String;
- type Error = io::Error;
- fn encode(&mut self, msg: String, buf: &mut BytesMut)-> io::Result<()> {
- buf.reserve(msg.len() + 1);
- buf.extend(msg.as_bytes());
- buf.put_u8(b'\n');
- debug!("call encode => complete({:?})", buf);
- Ok(())
- }
- }
- struct LineProto;
- impl<T: AsyncRead+AsyncWrite + 'static> pipeline::ClientProto<T> for LineProto {
- type Request = String;
- type Response = String;
- type Transport = codec::Framed<T, LineCodec>;
- type BindTransport = Result<Self::Transport, io::Error>;
- fn bind_transport(&self, io: T)-> Self::BindTransport {
- debug!("<LineProto as ClientProto>::bind_transport");
- Ok(io.framed(LineCodec))
- }
- }
- impl<T: AsyncRead + AsyncWrite + 'static> pipeline::ServerProto<T> for LineProto {
- type Request = String;
- type Response = String;
- type Transport = codec::Framed<T, LineCodec>;
- type BindTransport = Result<Self::Transport, io::Error>;
- fn bind_transport(&self, io: T)-> Self::BindTransport {
- debug!("<LineProto as ServerProto>::bind_transport");
- Ok(io.framed(LineCodec))
- }
- }
- pub struct Client {
- inner: Validate<pipeline::ClientService<net::TcpStream, LineProto>>,
- }
- impl Client {
- pub fn connect(addr: &SocketAddr, handle: &reactor::Handle)-> Box<Future<Item=Client, Error=io::Error>> {
- let ret =
- tokio_proto::TcpClient::new(LineProto)
- .connect(addr, handle)
- .map(|client_service| {
- let validate = Validate {inner: client_service};
- Client {inner: validate}
- });
- Box::new(ret)
- }
- }
- impl Service for Client {
- type Request = String;
- type Response = String;
- type Error = io::Error;
- type Future = Box<Future<Item=String, Error=io::Error>>;
- fn call(&self, req: String)-> Self::Future {
- debug!("<Client as Service>::call({:?})", req);
- self.inner.call(req)
- }
- }
- pub struct ExampleService;
- impl Service for ExampleService {
- type Request = String;
- type Response = String;
- type Error = io::Error;
- type Future = future::FutureResult<Self::Response, Self::Error>;
- // echo service
- fn call(&self, req: String) -> Self::Future {
- debug!("<ExampleService as Service>::call({:?})", req);
- debug!("** SERVER: {:?}", req);
- future::ok(req)
- }
- }
- pub struct ExampleServer;
- impl NewService for ExampleServer {
- type Request = String;
- type Response = String;
- type Error = io::Error;
- type Instance = ExampleService;
- fn new_service(&self)->io::Result<Self::Instance> {
- debug!("start new_service");
- Ok(ExampleService)
- }
- }
- pub fn serve<T>(addr: SocketAddr, new_service: T)
- where T: NewService<Request=String, Response=String, Error=io::Error> + Send + Sync + 'static,
- {
- let new_service = Validate{inner: new_service};
- tokio_proto::TcpServer::new(LineProto, addr)
- .serve(new_service);
- }
- ///RUST_LOG="simple_line=debug" cargo run --bin simple_line
- fn main() {
- env_logger::init().unwrap();
- use std::thread;
- use std::time::Duration;
- let mut core = tokio_core::reactor::Core::new().unwrap();
- let addr = "127.0.0.1:8080".parse().unwrap();
- // thread::spawn(move || {serve(addr, Ok(ExampleService)}) instead.
- thread::spawn(move || {
- debug!("start spawn server");
- serve(addr, ExampleServer)
- });
- // need to wait for the server to connect
- thread::sleep(Duration::from_millis(100));
- let handle = core.handle();
- /*
- DEBUG:simple_line: start spawn server
- DEBUG:simple_line: start core.run
- DEBUG:simple_line: <LineProto as ClientProto>::bind_transport
- DEBUG:simple_line: <Validate as NewService>::new_service
- DEBUG:simple_line: start new_service
- DEBUG:simple_line: <LineProto as ServerProto>::bind_transport
- DEBUG:simple_line: <Client as Service>::call("Hello world")
- DEBUG:simple_line: call encode => complete(b"Hello\x20world\n")
- DEBUG:simple_line: <ExampleService as Service>::call("Hello world")
- DEBUG:simple_line: ** SERVER: "Hello world"
- DEBUG:simple_line: call decode(b"") => complete
- DEBUG:simple_line: call encode => complete(b"Hello\x20world\n")
- DEBUG:simple_line: call decode(b"") => complete
- DEBUG:simple_line: ** SERVER=>CLIENT: "Hello world"
- Process finished with exit code 0
- */
- // Runs a future until completion, driving the event loop while we're otherwise waiting for the future to complete.
- // This function will begin executing the event loop and will finish once the provided future is resolved.
- debug!("start core.run");
- core.run(
- Client::connect(&addr, &handle)
- .and_then(|client| {
- client
- .call("Hello world".to_string())
- .and_then(move |response| {
- debug!("** SERVER=>CLIENT: {:?}", response);
- Ok(())
- })
- })
- ).unwrap();
- }
Add Comment
Please, Sign In to add comment