Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /// `stream`: The data source to listen upon
- /// `outbound_tx`: The transmitter for sending information across the stream
- /// `forward_inbound_tx`: The transmitter for sending information after reception from stream
- /// `stop_cxn_tube_tx`: The transmitter for sending a kill signal to this listener, ending the underlying process
- /// `stop_cxn_tube_rx`: The receiver for kill signals
- /// `remote`: The async module used to execute the subroutines
- pub fn wrap_stream<S: 'static + AsyncWrite + AsyncRead + Send>(stream: S, outbound_tx: UnboundedSender<OutboundItem>, outbound_rx: UnboundedReceiver<OutboundItem>, forward_inbound_tx: UnboundedSender<InboundItem>, stop_cxn_tube_tx: UnboundedSender<u8>, stop_cxn_tube_rx: UnboundedReceiver<u8>, remote: Remote) -> Result<Self, HyxeError> {
- //tokio_codec::LinesCodec::new()
- let framed = tokio_codec::Framed::new(stream, TransmissionSafeWebCodec::new());
- let (write, read) = framed.split();
- let inbound = read.for_each(move |packet| {
- println!("RECV_INBOUND: {:#?}", String::from_utf8(packet.clone()).unwrap());
- forward_inbound_tx.unbounded_send(packet);
- Ok(())
- }).map_err(|err| {()}).then(|res| Ok(()));
- //write.send("[TEST] hello world".to_string().into_bytes());
- let outbound = write.send_all(outbound_rx).map_err(|err|{
- println!("outbound err???");
- ()
- }).map(|res| {
- println!("outbound SEND");
- });
- let stopper = stop_cxn_tube_rx.from_err::<HyxeError>().for_each(move |signal| {
- println!("[AsyncStreamHandler] Shutting down stream");
- HyxeError::throw("[AsyncStreamHandler] Shutting down stream")
- }).map(|res| ()).map_err(|mut err| {err.printf();});
- remote.spawn(move |h| {
- inbound
- });
- remote.spawn(move |h| {
- outbound
- });
- remote.spawn(move |h| {
- stopper
- });
- /*
- let future = inbound.map_err(|err: ()| ()).join(outbound).map_err(|err| ()).map(|res| ());
- let future = stopper.join(future.from_err::<HyxeError>()).map_err(|mut err| err.printf()).map(|res : (Result<(), HyxeError>, ())| ());
- //(Result<(), HyxeError>, ())
- remote.execute(future.then(|res| {
- println!("STREAM CLOSED");
- Err(())
- }));*/
- Ok(Self {
- outbound_tx: outbound_tx.clone(),
- stop_cxn_tube_tx: stop_cxn_tube_tx.clone()
- })
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement