Guest User

Untitled

a guest
Jul 12th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.04 KB | None | 0 0
  1. let pipeline = framed_reader
  2. // Decode a message, start the chain
  3. .map(move |b| {
  4. println!("Decoding frame");
  5. let message: proto::Message = bincode::deserialize(&b).unwrap();
  6. match message {
  7. proto::Message::StartTransfer => (),
  8. _ => panic!("Message incorrect"),
  9. }
  10. })
  11. .map(move |_| {
  12. let mpsc_tx = mpsc_tx.clone();
  13. println!("Finding files");
  14.  
  15. // This creates a stream of pathbufs
  16. StreamFilePaths::new(PathBuf::from("/root/test"))
  17. .for_each(move |path| {
  18. let mpsc_tx = mpsc_tx.clone();
  19. println!("Opening {:?}", path);
  20.  
  21. // Open a pathbuf creating a Tokio::File
  22. File::open(path)
  23. .map(move |fh| {
  24. println!("Opened {:?}", fh);
  25.  
  26. //This takes a Tokio::File and creates a stream of BytesMut
  27. //This area of the code does not run
  28. StreamFile::new(fh, 8192)
  29. .for_each(move |chunk| {
  30. let mpsc_tx = mpsc_tx.clone();
  31. mpsc_tx.send(chunk).map(|_| ()).map_err(|e| {
  32. std::io::Error::new(
  33. std::io::ErrorKind::Other,
  34. e
  35. )
  36. })
  37. })
  38. })
  39. // I think this might be the issue here, I have a future for the
  40. // above code - but I don't know how to join it back to the chain
  41. // so it gets run with the buffer_unordered below?
  42. .map(|fut| ())
  43. })
  44. .map_err(|err: std::io::Error| {
  45. panic!("Directory streaming error: {:?}", err);
  46. })
  47. })
  48. .map_err(|err: std::io::Error| {
  49. panic!("Frame error: {:?}", err);
  50. })
  51. .buffer_unordered(2)
  52. .for_each(|fut| Ok(()) );
Add Comment
Please, Sign In to add comment