Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate tokio;
- use std::collections::HashMap;
- use std::io::BufReader;
- use std::sync::mpsc::{channel, Sender};
- use tokio::net::TcpListener;
- use tokio::prelude::{
- future::{loop_fn, Loop},
- Future, Stream,
- };
- use tokio::runtime::Runtime;
- struct EventSourceHandler {
- events_queue: HashMap<usize, Vec<String>>,
- state: usize,
- }
- impl EventSourceHandler {
- fn new() -> EventSourceHandler {
- EventSourceHandler {
- events_queue: HashMap::new(),
- state: 1,
- }
- }
- fn listen_handle(self, tx: Sender<Vec<String>>) -> impl Future<Item = (), Error = ()> {
- let addr = "127.0.0.1:9090".parse().unwrap();
- let listener = TcpListener::bind(&addr).unwrap();
- println!("Listening for events source on port 9090");
- listener
- .incoming()
- .for_each(move |socket| {
- let _ = self.state;
- println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
- let event_stream_loop = loop_fn(BufReader::new(socket), |reader| {
- tokio::io::read_until(reader, b'\n', Vec::new())
- .and_then(|(reader, raw_event)| {
- let event_str = String::from_utf8(raw_event).unwrap();
- if event_str.is_empty() {
- return Ok(Loop::Break(()));
- }
- let event: Vec<String> =
- event_str.trim().split('|').map(|x| x.to_string()).collect();
- println!("events: {:?}", event);
- let seq: usize = event[0].parse().unwrap();
- Ok(Loop::Continue(reader))
- })
- .map_err(|err| {
- println!("server error {:?}", err);
- })
- });
- tokio::spawn(event_stream_loop);
- Ok(())
- })
- .map_err(|err| {
- println!("server error {:?}", err);
- })
- }
- }
- fn main() {
- let (tx, rx) = channel();
- let events_handler = EventSourceHandler::new();
- let mut rt = Runtime::new().unwrap();
- rt.spawn(events_handler.listen_handle(tx.clone()));
- rt.shutdown_on_idle().wait().unwrap();
- }
Add Comment
Please, Sign In to add comment