Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate tokio; // 0.1.15
- extern crate futures; // 0.1.25
- use futures::{Future, Stream};
- use futures::future::Either;
- // пустая фьюча, задача прощелкнуть основной цикл сообщений
- // для этого один раз возвращаем Async::NotReady, а на следующем цикле уже Async::Ready()
- pub struct LoopTick { call: bool }
- impl LoopTick {
- pub fn new() -> LoopTick { LoopTick { call: false } }
- }
- impl Future for LoopTick {
- type Item = ();
- type Error = ();
- fn poll(&mut self) -> tokio::prelude::Poll<Self::Item, Self::Error> {
- if self.call { // не первое попадание в poll, завершаем фьючу
- Ok(tokio::prelude::Async::Ready(()))
- } else { // первое попадание в poll (call = false) возвращаем NotReady чтобы вернуть управление основному циклу сообщений
- self.call = true;
- Ok(tokio::prelude::Async::NotReady)
- }
- }
- }
- fn main() {
- let task = futures::future::ok(());
- // Таск спамит в консоль "interval N" каждую секунду
- let interval_task = tokio::timer::Interval::new(std::time::Instant::now(), std::time::Duration::from_secs(1))
- .map_err(|_| unreachable!())
- .fold(0, |i, _| { println!("interval {}", i); Ok(i+1) });
- let task = task.join(interval_task);
- // Таск спамит в консоль "repeat N" непрерывно
- let repeat_task = futures::stream::repeat::<_, ()>(0)
- .fold((0, current_secs()), |(i, mut last_time), _| {
- // спамим текущее значение i раз в секунду
- let current = current_secs();
- if current - last_time >= 1 { println!("repeat {}", i); last_time = current };
- // наш таск который должны выполнить "прощелкивая" основной цикл обработки сообщений
- let task = futures::future::ok(()).and_then(move |_| Ok((i + 1, last_time)));
- #[allow(dead_code)]
- enum LoopMode {
- PureRepeat, // обычный чистый repeat, блокирует основной цикл сообщений
- WithDelay, // нулевая задержка через Delay, прощелкивает цикл, но делает это неэффективно
- WithLoopTick // кастомная фьюча прощелкивает цикл куда быстрее
- };
- let mode = LoopMode::WithLoopTick; // Изменить для переключения режима
- match mode {
- LoopMode::PureRepeat => {
- // в этом варианте не используется нулевая задержка и, видимо,
- // основной цикл сообщений единожды войдя в код repeat_task будет бесконечно его выполнять
- // в связи с этим в консоли мы не увидим работу таска interval_task "interval N"
- // также код будет выполняться быстро, значение i очень быстро растет
- Either::A ( task )
- },
- LoopMode::WithDelay => {
- // в этом варианте используется нулевая задержка через Delay
- // каждый раз когда мы ее вызываем выполнение кода таска прерывается и управление возвращается основному циклу сообщений
- // таким образом в консоли мы увидим работу таска interval_task "interval N"
- // код этого таска с нулевой задержкой будет вызываться сильно медленнее чем без нее, значение i растет очень медленно
- let task_with_delay = tokio::timer::Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(0))
- .map_err(|_| unimplemented!())
- .and_then(move |_| task );
- Either::B ( Either::A (task_with_delay) )
- },
- LoopMode::WithLoopTick => {
- // в этом варианте используется кастомная фьюча, значение i растет быстро, цикл прощелкивается аналогично Delay
- let task_with_delay = LoopTick::new()
- .and_then(move |_| task );
- Either::B ( Either::B (task_with_delay) )
- }
- }
- });
- let task = task.join(repeat_task);
- // запускаем таски в одном потоке
- let task = task.map(|_| ()).map_err(|_| ());
- let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
- #[allow(unused_must_use)] { rt.block_on(task); }
- }
- fn current_secs() -> u64 {
- use std::time::{SystemTime, UNIX_EPOCH};
- let start = SystemTime::now();
- let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
- since_the_epoch.as_secs()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement