Guest User

Untitled

a guest
Aug 20th, 2018
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.92 KB | None | 0 0
  1. ![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)]
  2.  
  3. use std::future::{Future, FutureObj};
  4. use std::mem::PinMut;
  5. use std::sync::{Arc, Mutex};
  6. use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver};
  7. use std::task::{
  8. self,
  9. local_waker_from_nonlocal,
  10. Context,
  11. Poll,
  12. Spawn,
  13. SpawnErrorKind,
  14. SpawnObjError,
  15. Wake,
  16. };
  17.  
  18. mod secret;
  19. use self::secret::almost_ready;
  20.  
  21. /// Task executor that receives tasks off of a channel and runs them.
  22. struct Executor {
  23. task_receiver: Receiver<Arc<Task>>,
  24. }
  25.  
  26. impl Executor {
  27. fn run(&self) {
  28. // FIXME: implement the running of the executor.
  29. //
  30. // This method should pull tasks off of the existing task
  31. // queue and run them to completion.
  32. //
  33. // In order to poll futures, you'll need to construct a
  34. // `task::Context` from a `LocalWaker` and a `Spawn`.
  35. // You can get a value of type `LocalWaker` by calling
  36. // `local_waker_from_nonlocal` on an `Arc<W>` where `W: Wake`.
  37. //
  38. // To poll the future you'll need to do:
  39. // `PinMut::new(future).poll(cx)` where cx is `&mut Context`
  40. //
  41. while let Ok(cur_task) = self.task_receiver.recv().unwrap() {
  42. let mut local_spawner = &cur_task.spawner;
  43. let local_waker = local_waker_from_nonlocal(cur_task.clone());
  44. let mut cx = Context::new(&local_waker, &mut local_spawner);
  45. let mut future = cur_task.future.lock().unwrap().take();
  46. if let None = future {
  47. return;
  48. }
  49.  
  50. match PinMut::new(&mut future.unwrap()).poll(&mut cx) {
  51. Poll::Ready(ret) => println!("ready future {:?}", ret),
  52. Poll::Pending => {
  53. println!("pending future");
  54. local_waker.wake();
  55. },
  56. }
  57. }
  58. }
  59. }
  60.  
  61. /// Task executor that spawns tasks onto a channel.
  62. #[derive(Clone)]
  63. struct Spawner {
  64. task_sender: SyncSender<Arc<Task>>,
  65. }
  66.  
  67. impl Spawner {
  68. // Spawn a future as a new top-level task.
  69. fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
  70. let future_obj = FutureObj::new(Box::new(future));
  71. (&mut &*self).spawn_obj(future_obj)
  72. .expect("unable to spawn");
  73. }
  74. }
  75.  
  76. // Implement the `Spawn` trait for `&Spawner` rather than `Spawner` since
  77. // we don't require a mutable reference to `Spawner`.
  78. impl<'a> Spawn for &'a Spawner {
  79. fn spawn_obj(&mut self, future: FutureObj<'static, ()>)
  80. -> Result<(), SpawnObjError>
  81. {
  82. let task = Arc::new(Task {
  83. future: Mutex::new(Some(future)),
  84. spawner: self.clone(),
  85. });
  86.  
  87. self.task_sender.send(task).map_err(|SendError(task)| {
  88. SpawnObjError {
  89. kind: SpawnErrorKind::shutdown(),
  90. future: task.future.lock().unwrap().take().unwrap(),
  91. }
  92. })
  93. }
  94. }
  95.  
  96. struct Task {
  97. // In-progress future that should be pushed to completion
  98. future: Mutex<Option<FutureObj<'static, ()>>>,
  99. // Handle to spawn tasks onto the task queue
  100. spawner: Spawner,
  101. }
  102.  
  103. impl Wake for Task {
  104. fn wake(arc_self: &Arc<Self>) {
  105. // FIXME: implement `Wake` by putting the task back onto the task queue
  106. arc_self.spawner.task_sender.send(arc_self.clone()).unwrap();
  107. }
  108. }
  109.  
  110. fn new_executor_and_spawner() -> (Executor, Spawner) {
  111. // Maximum number of tasks to allow queueing in the channel at once.
  112. // This is just to make `sync_channel` happy, and wouldn't be present in
  113. // a real executor.
  114. const MAX_QUEUED_TASKS: usize = 10000;
  115. let (task_sender, task_receiver) = sync_channel(MAX_QUEUED_TASKS);
  116. (Executor { task_receiver }, Spawner { task_sender })
  117. }
  118.  
  119. fn main() {
  120. let (executor, spawner) = new_executor_and_spawner();
  121. spawner.spawn(async {
  122. println!("howdy!");
  123. let x = await!(almost_ready(5));
  124. println!("done: {:?}", x);
  125. });
  126. executor.run();
  127. }
Add Comment
Please, Sign In to add comment