Guest User

Untitled

a guest
Jun 22nd, 2018
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.34 KB | None | 0 0
  1. extern crate tokio;
  2.  
  3. use std::collections::HashMap;
  4. use std::io::BufReader;
  5. use std::sync::mpsc::{channel, Sender};
  6. use tokio::net::TcpListener;
  7. use tokio::prelude::{
  8. future::{loop_fn, Loop},
  9. Future, Stream,
  10. };
  11. use tokio::runtime::Runtime;
  12.  
  13. struct EventSourceHandler {
  14. events_queue: HashMap<usize, Vec<String>>,
  15. state: usize,
  16. }
  17.  
  18. impl EventSourceHandler {
  19. fn new() -> EventSourceHandler {
  20. EventSourceHandler {
  21. events_queue: HashMap::new(),
  22. state: 1,
  23. }
  24. }
  25. fn listen_handle(&mut self, tx: Sender<Vec<String>>) -> impl Future<Item = (), Error = ()> {
  26. let addr = "127.0.0.1:9090".parse().unwrap();
  27. let listener = TcpListener::bind(&addr).unwrap();
  28.  
  29. println!("Listening for events source on port 9090");
  30. let state = self.state;
  31. listener
  32. .incoming()
  33. .for_each(move |socket| {
  34. let _ = state;
  35. println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
  36. let event_stream_loop = loop_fn(BufReader::new(socket), |reader| {
  37. tokio::io::read_until(reader, b'\n', Vec::new())
  38. .and_then(|(reader, raw_event)| {
  39. let event_str = String::from_utf8(raw_event).unwrap();
  40. if event_str.is_empty() {
  41. return Ok(Loop::Break(()));
  42. }
  43. let event: Vec<String> =
  44. event_str.trim().split('|').map(|x| x.to_string()).collect();
  45. println!("events: {:?}", event);
  46. let seq: usize = event[0].parse().unwrap();
  47. Ok(Loop::Continue(reader))
  48. })
  49. .map_err(|err| {
  50. println!("server error {:?}", err);
  51. })
  52. });
  53. tokio::spawn(event_stream_loop);
  54. Ok(())
  55. })
  56. .map_err(|err| {
  57. println!("server error {:?}", err);
  58. })
  59. }
  60. }
  61.  
  62. fn main() {
  63. let (tx, rx) = channel();
  64. let mut events_handler = EventSourceHandler::new();
  65. let mut rt = Runtime::new().unwrap();
  66.  
  67. rt.spawn(events_handler.listen_handle(tx.clone()));
  68.  
  69. rt.shutdown_on_idle().wait().unwrap();
  70. }
Add Comment
Please, Sign In to add comment