Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate bytes;
- #[macro_use]
- extern crate futures; // 0.1.21
- extern crate tokio;
- extern crate tokio_fs;
- extern crate tokio_threadpool;
- extern crate walkdir;
- use bytes::BytesMut;
- use futures::future::*;
- use std::path::{Path, PathBuf};
- use tokio::prelude::*;
- use tokio_fs::File;
- use tokio_threadpool::blocking;
- use walkdir::WalkDir;
- struct StreamFilePaths {
- walkdir: walkdir::IntoIter,
- }
- impl StreamFilePaths {
- fn new(pgdata: &str) -> Self {
- StreamFilePaths {
- walkdir: WalkDir::new(Path::new(pgdata).to_path_buf())
- .min_depth(1)
- .into_iter(),
- }
- }
- }
- impl Stream for StreamFilePaths {
- type Item = PathBuf;
- type Error = std::io::Error;
- fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
- match self.walkdir.next() {
- Some(d) => Ok(Async::Ready(Some(d?.path().to_path_buf()))),
- None => Ok(Async::Ready(None)),
- }
- }
- }
- struct StreamFile {
- asyncread: tokio_fs::File,
- buffer: BytesMut,
- offset: usize,
- chunk_size: usize,
- }
- impl StreamFile {
- fn new(asyncread: tokio_fs::File, chunk_size: usize) -> Self {
- StreamFile {
- asyncread: asyncread,
- buffer: BytesMut::new(),
- offset: 0,
- chunk_size: chunk_size,
- }
- }
- fn fill_read_buf(&mut self) -> Result<Async<usize>, std::io::Error> {
- self.buffer.reserve(self.chunk_size);
- let n = try_ready!(self.asyncread.read_buf(&mut self.buffer));
- self.offset += n;
- Ok(Async::Ready(n))
- }
- }
- impl Stream for StreamFile {
- type Item = (BytesMut, bool, usize);
- type Error = std::io::Error;
- fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
- let n = try_ready!(self.fill_read_buf());
- if n > 0 {
- return Ok(Async::Ready(Some((
- self.buffer.split_to(n),
- n != self.chunk_size,
- self.offset - n,
- ))));
- } else {
- println!("Done at offset {}", self.offset);
- Ok(Async::Ready(None))
- }
- }
- }
- fn main() {
- let run = StreamFilePaths::new("/var/log")
- .and_then(|path| File::open(path))
- .map(move |fh| {
- println!("Streaming file {:?}", fh);
- let sf = StreamFile::new(fh, 8192);
- sf
- .map(move |buf| buf)
- .collect()
- })
- .buffer_unordered(2)
- .collect()
- .map(|_| ())
- .map_err(|_| ());
- tokio::run(run);
- }
Add Comment
Please, Sign In to add comment