Guest User

Untitled

a guest
Jul 19th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.72 KB | None | 0 0
  1. extern crate bytes;
  2. #[macro_use]
  3. extern crate futures; // 0.1.21
  4. extern crate tokio;
  5. extern crate tokio_fs;
  6.  
  7. use bytes::BytesMut;
  8. use futures::future::*;
  9. use tokio::prelude::*;
  10. use tokio_fs::File;
  11.  
  12. struct StreamFile {
  13. asyncread: tokio_fs::File,
  14. buffer: BytesMut,
  15. offset: usize,
  16. chunk_size: usize,
  17. }
  18.  
  19. impl StreamFile {
  20. fn new(asyncread: tokio_fs::File, chunk_size: usize) -> Self {
  21. StreamFile {
  22. asyncread: asyncread,
  23. buffer: BytesMut::new(),
  24. offset: 0,
  25. chunk_size: chunk_size,
  26. }
  27. }
  28.  
  29. fn fill_read_buf(&mut self) -> Result<Async<usize>, std::io::Error> {
  30. self.buffer.reserve(self.chunk_size);
  31. let n = try_ready!(self.asyncread.read_buf(&mut self.buffer));
  32. self.offset += n;
  33. Ok(Async::Ready(n))
  34. }
  35. }
  36.  
  37. impl Stream for StreamFile {
  38. type Item = (BytesMut, bool, usize);
  39. type Error = std::io::Error;
  40.  
  41. fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
  42. println!("Polling file {:?}", self.asyncread);
  43.  
  44. let n = try_ready!(self.fill_read_buf());
  45.  
  46. if n > 0 {
  47. return Ok(Async::Ready(Some((
  48. self.buffer.split_to(n),
  49. n != self.chunk_size,
  50. self.offset - n,
  51. ))));
  52. } else {
  53. println!("Done at offset {}", self.offset);
  54. Ok(Async::Ready(None))
  55. }
  56. }
  57. }
  58.  
  59.  
  60. fn main() {
  61. let run =
  62. stream::iter_ok(vec!["/proc/cpuinfo"; 4])
  63. .for_each(|path| {
  64. File::open(path)
  65. .map_err(|_| ())
  66. .and_then(|fh| {
  67. tokio::spawn(StreamFile::new(fh, 8192).map_err(|_| ()).for_each(|_| Ok(())))
  68. })
  69. });
  70.  
  71. tokio::run(run);
  72. }
Add Comment
Please, Sign In to add comment