Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate bytes;
- #[macro_use]
- extern crate futures; // 0.1.21
- extern crate tokio;
- extern crate tokio_fs;
- use bytes::BytesMut;
- use futures::future::*;
- use tokio::prelude::*;
- use tokio_fs::File;
- struct StreamFile {
- asyncread: tokio_fs::File,
- buffer: BytesMut,
- offset: usize,
- chunk_size: usize,
- }
- impl StreamFile {
- fn new(asyncread: tokio_fs::File, chunk_size: usize) -> Self {
- StreamFile {
- asyncread: asyncread,
- buffer: BytesMut::new(),
- offset: 0,
- chunk_size: chunk_size,
- }
- }
- fn fill_read_buf(&mut self) -> Result<Async<usize>, std::io::Error> {
- self.buffer.reserve(self.chunk_size);
- let n = try_ready!(self.asyncread.read_buf(&mut self.buffer));
- self.offset += n;
- Ok(Async::Ready(n))
- }
- }
- impl Stream for StreamFile {
- type Item = (BytesMut, bool, usize);
- type Error = std::io::Error;
- fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
- println!("Polling file {:?}", self.asyncread);
- let n = try_ready!(self.fill_read_buf());
- if n > 0 {
- return Ok(Async::Ready(Some((
- self.buffer.split_to(n),
- n != self.chunk_size,
- self.offset - n,
- ))));
- } else {
- println!("Done at offset {}", self.offset);
- Ok(Async::Ready(None))
- }
- }
- }
- fn main() {
- let run =
- stream::iter_ok(vec!["/proc/cpuinfo"; 4])
- .for_each(|path| {
- File::open(path)
- .map_err(|_| ())
- .and_then(|fh| {
- tokio::spawn(StreamFile::new(fh, 8192).map_err(|_| ()).for_each(|_| Ok(())))
- })
- });
- tokio::run(run);
- }
Add Comment
Please, Sign In to add comment