Advertisement
NLinker

A piece on the rust server

Apr 15th, 2017
200
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.91 KB | None | 0 0
  1. pub fn thread_fn_main(terminate: Arc<AtomicBool>,
  2.                       rx_main_to_ui: BroadcastFutReceiver<MsgMainToUi>,
  3.                       tx_ui_to_main: Sender<MsgUiToMain>
  4. ) {
  5.     let ui = Arc::new(Mutex::new(UiState::default()));
  6.  
  7.     // this thread keeps track of the ui state by integrating the msgs
  8.     let terminate_ui_state = terminate.clone();
  9.     let rx_main_to_ui_ui_state = rx_main_to_ui.add_stream();
  10.     let ui_ui_state = ui.clone();
  11.     let thread_ui_state = thread::Builder::new().name("ui_state".to_string()).spawn(move || {
  12.         for _ in FpsLoop::new(60) {
  13.             if terminate_ui_state.load(Ordering::Relaxed) { break }
  14.             while let Ok(msg) = rx_main_to_ui_ui_state.try_recv() {
  15.                 let mut ui = ui_ui_state.lock().unwrap();
  16.                 ui.apply_msg(msg);
  17.             }
  18.         }
  19.         info!("terminating ui_state");
  20.     }).unwrap();
  21.  
  22.     let port = ::std::env::var("WS_PORT").expect("WS_PORT must be set");
  23.     let addr = format!("127.0.0.1:{}", port).to_socket_addrs().unwrap().next().unwrap();
  24.     info!("listening on {}", addr);
  25.     let mut core = Core::new().unwrap();
  26.     let handle = core.handle();
  27.     let server = TcpListener::bind(&addr, &handle).unwrap();
  28.     let srv = server.incoming().for_each(|(stream, addr)| {
  29.         let rx_main_to_ui = rx_main_to_ui.add_stream();
  30.         let tx_ui_to_main = tx_ui_to_main.clone();
  31.         let handle_inner = handle.clone();
  32.         let ui_state_start = ui.lock().unwrap().serialize(); // current ui state for the new client
  33.         accept_async(stream).and_then(move |ws_stream| {
  34.             info!("new websocket connection: {}", addr);
  35.             let (sink, stream) = ws_stream.split();
  36.             let ws_reader = stream.for_each(move |msg| {
  37.                 match msg {
  38.                     Message::Text(msg) => match serde_json::from_str(&msg) {
  39.                         Ok(msg) => tx_ui_to_main.send(msg).unwrap(),
  40.                         Err(e) => error!("{}", e)
  41.                     },
  42.                     Message::Binary(_) => info!("got binary data")
  43.                 }
  44.                 Ok(())
  45.             });
  46.             use std::result::Result;
  47.             let ws_writer = iter(ui_state_start.into_iter().map(|x| -> Result<MsgMainToUi, ()> { Ok(x) })).fold(sink, |mut sink, msg| {
  48.                 send_json(&mut sink, msg).map(|_| sink).map_err(|_| ())
  49.             }).and_then(move |sink| {
  50.                 rx_main_to_ui.fold(sink, |mut sink, msg| {
  51.                     send_json(&mut sink, msg).unwrap();
  52.                     Ok(sink)
  53.                 })
  54.             });
  55.             let connection = ws_reader.map(|_| ()).map_err(|_| ())
  56.                      .select(ws_writer.map(|_| ()).map_err(|_| ()));
  57.             handle_inner.spawn(connection.then(move |_| {
  58.                 info!("Connection {} closed.", addr);
  59.                 Ok(())
  60.             }));
  61.             Ok(())
  62.         }).map_err(|e| {
  63.             error!("Error during the websocket handshake occurred: {}", e);
  64.             use std::io::{Error, ErrorKind};
  65.             Error::new(ErrorKind::Other, e)
  66.         })
  67.     });
  68.     core.run(srv).unwrap();
  69.     thread_ui_state.join().unwrap();
  70. }
  71.  
  72. fn send_json<S: Sink<SinkItem=Message, SinkError=Error>, T: Serialize>(sink: &mut S, payload: T) -> Result<AsyncSink<Message>> {
  73.     let json = serde_json::to_string(&payload).unwrap();
  74.     let msg = Message::Text(json);
  75.     sink.start_send(msg)
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement