Advertisement
Guest User

Untitled

a guest
Jul 9th, 2019
121
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.74 KB | None | 0 0
  1. /// `stream`: The data source to listen upon
  2.         /// `outbound_tx`: The transmitter for sending information across the stream
  3.         /// `forward_inbound_tx`: The transmitter for sending information after reception from stream
  4.         /// `stop_cxn_tube_tx`: The transmitter for sending a kill signal to this listener, ending the underlying process
  5.         /// `stop_cxn_tube_rx`: The receiver for kill signals
  6.         /// `remote`: The async module used to execute the subroutines
  7.         pub fn wrap_stream<S: 'static + AsyncWrite + AsyncRead + Send>(stream: S, outbound_tx: UnboundedSender<OutboundItem>, outbound_rx: UnboundedReceiver<OutboundItem>, forward_inbound_tx: UnboundedSender<InboundItem>, stop_cxn_tube_tx: UnboundedSender<u8>, stop_cxn_tube_rx: UnboundedReceiver<u8>, remote: Remote) -> Result<Self, HyxeError> {
  8.  
  9.            //tokio_codec::LinesCodec::new()
  10.            let framed = tokio_codec::Framed::new(stream, TransmissionSafeWebCodec::new());
  11.            let (write, read) = framed.split();
  12.  
  13.  
  14.            let inbound = read.for_each(move |packet| {
  15.                println!("RECV_INBOUND: {:#?}", String::from_utf8(packet.clone()).unwrap());
  16.                forward_inbound_tx.unbounded_send(packet);
  17.                Ok(())
  18.            }).map_err(|err| {()}).then(|res| Ok(()));
  19.  
  20.            //write.send("[TEST] hello world".to_string().into_bytes());
  21.  
  22.            let outbound = write.send_all(outbound_rx).map_err(|err|{
  23.                println!("outbound err???");
  24.                ()
  25.            }).map(|res| {
  26.                println!("outbound SEND");
  27.            });
  28.  
  29.  
  30.            let stopper = stop_cxn_tube_rx.from_err::<HyxeError>().for_each(move |signal| {
  31.                println!("[AsyncStreamHandler] Shutting down stream");
  32.                HyxeError::throw("[AsyncStreamHandler] Shutting down stream")
  33.            }).map(|res| ()).map_err(|mut err| {err.printf();});
  34.  
  35.  
  36.            remote.spawn(move |h| {
  37.                inbound
  38.            });
  39.  
  40.            remote.spawn(move |h| {
  41.                outbound
  42.            });
  43.  
  44.            remote.spawn(move |h| {
  45.                stopper
  46.            });
  47.  
  48.            /*
  49.            let future = inbound.map_err(|err: ()| ()).join(outbound).map_err(|err| ()).map(|res| ());
  50.  
  51.            let future = stopper.join(future.from_err::<HyxeError>()).map_err(|mut err| err.printf()).map(|res : (Result<(), HyxeError>, ())| ());
  52.            //(Result<(), HyxeError>, ())
  53.            remote.execute(future.then(|res| {
  54.                println!("STREAM CLOSED");
  55.                Err(())
  56.            }));*/
  57.  
  58.            Ok(Self {
  59.                outbound_tx: outbound_tx.clone(),
  60.                stop_cxn_tube_tx: stop_cxn_tube_tx.clone()
  61.            })
  62.        }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement