Guest User

Untitled

a guest
Jul 20th, 2018
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.51 KB | None | 0 0
  1. extern crate bytes;
  2. #[macro_use]
  3. extern crate futures; // 0.1.21
  4. extern crate tokio;
  5. extern crate tokio_fs;
  6. extern crate tokio_threadpool;
  7. extern crate walkdir;
  8.  
  9. use bytes::BytesMut;
  10. use futures::future::*;
  11. use std::path::{Path, PathBuf};
  12. use tokio::prelude::*;
  13. use tokio_fs::File;
  14. use tokio_threadpool::blocking;
  15. use walkdir::WalkDir;
  16.  
  17. struct StreamFilePaths {
  18. walkdir: walkdir::IntoIter,
  19. }
  20.  
  21. impl StreamFilePaths {
  22. fn new(pgdata: &str) -> Self {
  23. StreamFilePaths {
  24. walkdir: WalkDir::new(Path::new(pgdata).to_path_buf())
  25. .min_depth(1)
  26. .into_iter(),
  27. }
  28. }
  29. }
  30.  
  31. impl Stream for StreamFilePaths {
  32. type Item = PathBuf;
  33. type Error = std::io::Error;
  34.  
  35. fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
  36. match self.walkdir.next() {
  37. Some(d) => Ok(Async::Ready(Some(d?.path().to_path_buf()))),
  38. None => Ok(Async::Ready(None)),
  39. }
  40. }
  41. }
  42. struct StreamFile {
  43. asyncread: tokio_fs::File,
  44. buffer: BytesMut,
  45. offset: usize,
  46. chunk_size: usize,
  47. }
  48.  
  49. impl StreamFile {
  50. fn new(asyncread: tokio_fs::File, chunk_size: usize) -> Self {
  51. StreamFile {
  52. asyncread: asyncread,
  53. buffer: BytesMut::new(),
  54. offset: 0,
  55. chunk_size: chunk_size,
  56. }
  57. }
  58.  
  59. fn fill_read_buf(&mut self) -> Result<Async<usize>, std::io::Error> {
  60. self.buffer.reserve(self.chunk_size);
  61. let n = try_ready!(self.asyncread.read_buf(&mut self.buffer));
  62. self.offset += n;
  63. Ok(Async::Ready(n))
  64. }
  65. }
  66.  
  67. impl Stream for StreamFile {
  68. type Item = (BytesMut, bool, usize);
  69. type Error = std::io::Error;
  70.  
  71. fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
  72. let n = try_ready!(self.fill_read_buf());
  73.  
  74. if n > 0 {
  75. return Ok(Async::Ready(Some((
  76. self.buffer.split_to(n),
  77. n != self.chunk_size,
  78. self.offset - n,
  79. ))));
  80. } else {
  81. println!("Done at offset {}", self.offset);
  82. Ok(Async::Ready(None))
  83. }
  84. }
  85. }
  86.  
  87. fn main() {
  88. let run = StreamFilePaths::new("/var/log")
  89. .and_then(|path| File::open(path))
  90. .map(move |fh| {
  91. println!("Streaming file {:?}", fh);
  92. let sf = StreamFile::new(fh, 8192);
  93. sf
  94. .map(move |buf| buf)
  95. .collect()
  96. })
  97. .buffer_unordered(2)
  98. .collect()
  99. .map(|_| ())
  100. .map_err(|_| ());
  101.  
  102. tokio::run(run);
  103. }
Add Comment
Please, Sign In to add comment