Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- let pipeline = framed_reader
- // Decode a message, start the chain
- .map(move |b| {
- println!("Decoding frame");
- let message: proto::Message = bincode::deserialize(&b).unwrap();
- match message {
- proto::Message::StartTransfer => (),
- _ => panic!("Message incorrect"),
- }
- })
- .map(move |_| {
- let mpsc_tx = mpsc_tx.clone();
- println!("Finding files");
- // This creates a stream of pathbufs
- StreamFilePaths::new(PathBuf::from("/root/test"))
- .for_each(move |path| {
- let mpsc_tx = mpsc_tx.clone();
- println!("Opening {:?}", path);
- // Open a pathbuf creating a Tokio::File
- File::open(path)
- .map(move |fh| {
- println!("Opened {:?}", fh);
- //This takes a Tokio::File and creates a stream of BytesMut
- //This area of the code does not run
- StreamFile::new(fh, 8192)
- .for_each(move |chunk| {
- let mpsc_tx = mpsc_tx.clone();
- mpsc_tx.send(chunk).map(|_| ()).map_err(|e| {
- std::io::Error::new(
- std::io::ErrorKind::Other,
- e
- )
- })
- })
- })
- // I think this might be the issue here, I have a future for the
- // above code - but I don't know how to join it back to the chain
- // so it gets run with the buffer_unordered below?
- .map(|fut| ())
- })
- .map_err(|err: std::io::Error| {
- panic!("Directory streaming error: {:?}", err);
- })
- })
- .map_err(|err: std::io::Error| {
- panic!("Frame error: {:?}", err);
- })
- .buffer_unordered(2)
- .for_each(|fut| Ok(()) );
Add Comment
Please, Sign In to add comment