Advertisement
Guest User

Untitled

a guest
Apr 20th, 2019
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.44 KB | None | 0 0
  1. extern crate tokio; // 0.1.15
  2. extern crate futures; // 0.1.25
  3.  
  4. use futures::{Future, Stream};
  5. use futures::future::Either;
  6.  
  7. // пустая фьюча, задача прощелкнуть основной цикл сообщений
  8. // для этого один раз возвращаем Async::NotReady, а на следующем цикле уже Async::Ready()
  9. pub struct LoopTick { call: bool }
  10. impl LoopTick {
  11. pub fn new() -> LoopTick { LoopTick { call: false } }
  12. }
  13. impl Future for LoopTick {
  14. type Item = ();
  15. type Error = ();
  16. fn poll(&mut self) -> tokio::prelude::Poll<Self::Item, Self::Error> {
  17. if self.call { // не первое попадание в poll, завершаем фьючу
  18. Ok(tokio::prelude::Async::Ready(()))
  19. } else { // первое попадание в poll (call = false) возвращаем NotReady чтобы вернуть управление основному циклу сообщений
  20. self.call = true;
  21. Ok(tokio::prelude::Async::NotReady)
  22. }
  23. }
  24. }
  25.  
  26. fn main() {
  27. let task = futures::future::ok(());
  28.  
  29. // Таск спамит в консоль "interval N" каждую секунду
  30. let interval_task = tokio::timer::Interval::new(std::time::Instant::now(), std::time::Duration::from_secs(1))
  31. .map_err(|_| unreachable!())
  32. .fold(0, |i, _| { println!("interval {}", i); Ok(i+1) });
  33. let task = task.join(interval_task);
  34.  
  35. // Таск спамит в консоль "repeat N" непрерывно
  36. let repeat_task = futures::stream::repeat::<_, ()>(0)
  37. .fold((0, current_secs()), |(i, mut last_time), _| {
  38. // спамим текущее значение i раз в секунду
  39. let current = current_secs();
  40. if current - last_time >= 1 { println!("repeat {}", i); last_time = current };
  41.  
  42. // наш таск который должны выполнить "прощелкивая" основной цикл обработки сообщений
  43. let task = futures::future::ok(()).and_then(move |_| Ok((i + 1, last_time)));
  44.  
  45. #[allow(dead_code)]
  46. enum LoopMode {
  47. PureRepeat, // обычный чистый repeat, блокирует основной цикл сообщений
  48. WithDelay, // нулевая задержка через Delay, прощелкивает цикл, но делает это неэффективно
  49. WithLoopTick // кастомная фьюча прощелкивает цикл куда быстрее
  50. };
  51. let mode = LoopMode::WithLoopTick; // Изменить для переключения режима
  52. match mode {
  53. LoopMode::PureRepeat => {
  54. // в этом варианте не используется нулевая задержка и, видимо,
  55. // основной цикл сообщений единожды войдя в код repeat_task будет бесконечно его выполнять
  56. // в связи с этим в консоли мы не увидим работу таска interval_task "interval N"
  57. // также код будет выполняться быстро, значение i очень быстро растет
  58. Either::A ( task )
  59. },
  60. LoopMode::WithDelay => {
  61. // в этом варианте используется нулевая задержка через Delay
  62. // каждый раз когда мы ее вызываем выполнение кода таска прерывается и управление возвращается основному циклу сообщений
  63. // таким образом в консоли мы увидим работу таска interval_task "interval N"
  64. // код этого таска с нулевой задержкой будет вызываться сильно медленнее чем без нее, значение i растет очень медленно
  65. let task_with_delay = tokio::timer::Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(0))
  66. .map_err(|_| unimplemented!())
  67. .and_then(move |_| task );
  68. Either::B ( Either::A (task_with_delay) )
  69. },
  70. LoopMode::WithLoopTick => {
  71. // в этом варианте используется кастомная фьюча, значение i растет быстро, цикл прощелкивается аналогично Delay
  72. let task_with_delay = LoopTick::new()
  73. .and_then(move |_| task );
  74. Either::B ( Either::B (task_with_delay) )
  75. }
  76. }
  77. });
  78. let task = task.join(repeat_task);
  79.  
  80. // запускаем таски в одном потоке
  81. let task = task.map(|_| ()).map_err(|_| ());
  82. let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
  83. #[allow(unused_must_use)] { rt.block_on(task); }
  84. }
  85.  
  86.  
  87. fn current_secs() -> u64 {
  88. use std::time::{SystemTime, UNIX_EPOCH};
  89. let start = SystemTime::now();
  90. let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
  91. since_the_epoch.as_secs()
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement