Guest User

Untitled

a guest
Jun 22nd, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.31 KB | None | 0 0
  1. extern crate tokio;
  2.  
  3. use std::collections::HashMap;
  4. use std::io::BufReader;
  5. use std::sync::mpsc::{channel, Sender};
  6. use tokio::net::TcpListener;
  7. use tokio::prelude::{
  8. future::{loop_fn, Loop},
  9. Future, Stream,
  10. };
  11. use tokio::runtime::Runtime;
  12.  
  13. struct EventSourceHandler {
  14. events_queue: HashMap<usize, Vec<String>>,
  15. state: usize,
  16. }
  17.  
  18. impl EventSourceHandler {
  19. fn new() -> EventSourceHandler {
  20. EventSourceHandler {
  21. events_queue: HashMap::new(),
  22. state: 1,
  23. }
  24. }
  25. fn listen_handle(self, tx: Sender<Vec<String>>) -> impl Future<Item = (), Error = ()> {
  26. let addr = "127.0.0.1:9090".parse().unwrap();
  27. let listener = TcpListener::bind(&addr).unwrap();
  28.  
  29. println!("Listening for events source on port 9090");
  30. listener
  31. .incoming()
  32. .for_each(move |socket| {
  33. let _ = self.state;
  34. println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
  35. let event_stream_loop = loop_fn(BufReader::new(socket), |reader| {
  36. tokio::io::read_until(reader, b'\n', Vec::new())
  37. .and_then(|(reader, raw_event)| {
  38. let event_str = String::from_utf8(raw_event).unwrap();
  39. if event_str.is_empty() {
  40. return Ok(Loop::Break(()));
  41. }
  42. let event: Vec<String> =
  43. event_str.trim().split('|').map(|x| x.to_string()).collect();
  44. println!("events: {:?}", event);
  45. let seq: usize = event[0].parse().unwrap();
  46. Ok(Loop::Continue(reader))
  47. })
  48. .map_err(|err| {
  49. println!("server error {:?}", err);
  50. })
  51. });
  52. tokio::spawn(event_stream_loop);
  53. Ok(())
  54. })
  55. .map_err(|err| {
  56. println!("server error {:?}", err);
  57. })
  58. }
  59. }
  60.  
  61. fn main() {
  62. let (tx, rx) = channel();
  63. let events_handler = EventSourceHandler::new();
  64. let mut rt = Runtime::new().unwrap();
  65.  
  66. rt.spawn(events_handler.listen_handle(tx.clone()));
  67.  
  68. rt.shutdown_on_idle().wait().unwrap();
  69. }
Add Comment
Please, Sign In to add comment