Advertisement
Guest User

Untitled

a guest
Apr 20th, 2019
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.46 KB | None | 0 0
  1. extern crate tokio; // 0.1.15
  2. extern crate futures; // 0.1.25
  3.  
  4. use futures::{Future, Stream};
  5. use futures::future::Either;
  6.  
  7. // пустая фьюча, задача прощелкнуть основной цикл сообщений
  8. // для этого один раз возвращаем Async::NotReady, а на следующем цикле уже Async::Ready()
  9. pub struct LoopTick { call: bool }
  10. impl LoopTick {
  11. pub fn new() -> LoopTick { LoopTick { call: false } }
  12. }
  13. impl Future for LoopTick {
  14. type Item = ();
  15. type Error = ();
  16. fn poll(&mut self) -> tokio::prelude::Poll<Self::Item, Self::Error> {
  17. if self.call { // не первое попадание в poll, завершаем фьючу
  18. futures::task::current().notify();
  19. Ok(tokio::prelude::Async::Ready(()))
  20. } else { // первое попадание в poll (call = false) возвращаем NotReady чтобы вернуть управление основному циклу сообщений
  21. self.call = true;
  22. Ok(tokio::prelude::Async::NotReady)
  23. }
  24. }
  25. }
  26.  
  27. fn main() {
  28. let task = futures::future::ok(());
  29.  
  30. // Таск спамит в консоль "interval N" каждую секунду
  31. let interval_task = tokio::timer::Interval::new(std::time::Instant::now(), std::time::Duration::from_secs(1))
  32. .map_err(|_| unreachable!())
  33. .fold(0, |i, _| { println!("interval {}", i); Ok(i+1) });
  34. let task = task.join(interval_task);
  35.  
  36. // Таск спамит в консоль "repeat N" непрерывно
  37. let repeat_task = futures::stream::repeat::<_, ()>(0)
  38. .fold((0, current_secs()), |(i, mut last_time), _| {
  39. // спамим текущее значение i раз в секунду
  40. let current = current_secs();
  41. if current - last_time >= 1 { println!("repeat {}", i); last_time = current };
  42.  
  43. // наш таск который должны выполнить "прощелкивая" основной цикл обработки сообщений
  44. let task = futures::future::ok( (i + 1, last_time) );
  45.  
  46. #[allow(dead_code)]
  47. enum LoopMode {
  48. PureRepeat, // обычный чистый repeat, блокирует основной цикл сообщений
  49. WithDelay, // нулевая задержка через Delay, прощелкивает цикл, но делает это неэффективно
  50. WithLoopTick // кастомная фьюча прощелкивает цикл куда быстрее
  51. };
  52. let mode = LoopMode::WithLoopTick; // Изменить для переключения режима
  53. match mode {
  54. LoopMode::PureRepeat => {
  55. // в этом варианте не используется нулевая задержка и, видимо,
  56. // основной цикл сообщений единожды войдя в код repeat_task будет бесконечно его выполнять
  57. // в связи с этим в консоли мы не увидим работу таска interval_task "interval N"
  58. // также код будет выполняться быстро, значение i очень быстро растет
  59. Either::A ( task )
  60. },
  61. LoopMode::WithDelay => {
  62. // в этом варианте используется нулевая задержка через Delay
  63. // каждый раз когда мы ее вызываем выполнение кода таска прерывается и управление возвращается основному циклу сообщений
  64. // таким образом в консоли мы увидим работу таска interval_task "interval N"
  65. // код этого таска с нулевой задержкой будет вызываться сильно медленнее чем без нее, значение i растет очень медленно
  66. let task_with_delay = tokio::timer::Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(0))
  67. .map_err(|_| unimplemented!())
  68. .and_then(move |_| task );
  69. Either::B ( Either::A (task_with_delay) )
  70. },
  71. LoopMode::WithLoopTick => {
  72. // в этом варианте используется кастомная фьюча, значение i растет быстро, цикл прощелкивается аналогично Delay
  73. let task_with_delay = LoopTick::new()
  74. .and_then(move |_| task );
  75. Either::B ( Either::B (task_with_delay) )
  76. }
  77. }
  78. });
  79. let task = task.join(repeat_task);
  80.  
  81. // запускаем таски в одном потоке
  82. let task = task.map(|_| ()).map_err(|_| ());
  83. let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
  84. #[allow(unused_must_use)] { rt.block_on(task); }
  85. }
  86.  
  87.  
  88. fn current_secs() -> u64 {
  89. use std::time::{SystemTime, UNIX_EPOCH};
  90. let start = SystemTime::now();
  91. let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
  92. since_the_epoch.as_secs()
  93. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement