Advertisement
Guest User

Untitled

a guest
Apr 26th, 2019
184
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.05 KB | None | 0 0
  1. #include <executors/executors.h>
  2.  
  3. // Task
  4.  
  5. Task::Task()
  6. : state_(Task::State::kCreated),
  7. has_dependencies_(false),
  8. num_unmet_dependencies_(0),
  9. has_triggers_(false),
  10. was_triggered_(false),
  11. has_time_trigger_(false),
  12. was_time_triggered_(false) {
  13. }
  14.  
  15. void Task::addDependency(std::shared_ptr<Task> dep) {
  16. std::scoped_lock lock(mutex_, dep->mutex_);
  17. if (state_ != Task::State::kCreated) {
  18. throw std::runtime_error("Cannot configure the task");
  19. }
  20. has_dependencies_ = true;
  21. if (!dep->isFinished()) {
  22. ++num_unmet_dependencies_;
  23. } else {
  24. checkIfReadyToRunUnlocked();
  25. }
  26. dep->depends_on_this_.push_back(weak_from_this());
  27. }
  28.  
  29. void Task::addTrigger(std::shared_ptr<Task> dep) {
  30. std::scoped_lock lock(mutex_, dep->mutex_);
  31. if (state_ != Task::State::kCreated) {
  32. throw std::runtime_error("Cannot configure the task");
  33. }
  34. has_triggers_ = true;
  35. if (dep->isFinished()) {
  36. was_triggered_ = true;
  37. checkIfReadyToRunUnlocked();
  38. }
  39. dep->triggered_by_this_.push_back(weak_from_this());
  40. }
  41.  
  42. void Task::setTimeTrigger(std::chrono::system_clock::time_point at) {
  43. std::scoped_lock lock(mutex_);
  44. if (state_ != Task::State::kCreated || has_time_trigger_) {
  45. throw std::runtime_error("Cannot configure the task");
  46. }
  47. has_time_trigger_ = true;
  48. time_trigger_ = at;
  49. auto shared_executor = weak_executor_.lock();
  50. if (shared_executor) {
  51. shared_executor->addTimeTriggeredTask(shared_from_this());
  52. }
  53. }
  54.  
  55. bool Task::isCompleted() const {
  56. return state_ == Task::State::kCompleted;
  57. }
  58.  
  59. bool Task::isFailed() const {
  60. return state_ == Task::State::kFailed;
  61. }
  62.  
  63. bool Task::isCanceled() const {
  64. return state_ == Task::State::kCanceled;
  65. }
  66.  
  67. bool Task::isFinished() const {
  68. return isCompleted() || isFailed() || isCanceled();
  69. }
  70.  
  71. std::exception_ptr Task::getError() const {
  72. return thrown_exception_;
  73. }
  74.  
  75. void Task::cancel() {
  76. std::scoped_lock lock(mutex_);
  77. if (isCanceled()) {
  78. return;
  79. }
  80. state_ = Task::State::kCanceled;
  81. notifyDependants();
  82. auto shared_executor = weak_executor_.lock();
  83. if (!shared_executor) {
  84. return;
  85. }
  86. auto shared_this = shared_from_this();
  87. shared_executor->removeActiveTask(shared_this);
  88. if (has_time_trigger_) {
  89. shared_executor->removeTimeTriggeredTask(shared_this);
  90. }
  91. }
  92.  
  93. void Task::wait() {
  94. std::unique_lock lock(mutex_);
  95. on_finished_.wait(lock, [this] { return isFinished(); });
  96. }
  97.  
  98. void Task::checkIfReadyToRunUnlocked() {
  99. auto shared_executor = weak_executor_.lock();
  100. if (!shared_executor || state_ != Task::State::kCreated) {
  101. return;
  102. }
  103. if ((has_dependencies_ && num_unmet_dependencies_ == 0) || (has_triggers_ && was_triggered_) ||
  104. (has_time_trigger_ && was_time_triggered_) ||
  105. (!has_dependencies_ && !has_triggers_ && !has_time_trigger_)) {
  106. state_ = Task::State::kReadyToRun;
  107. shared_executor->addReadyToRunTask(shared_from_this());
  108. }
  109. }
  110.  
  111. void Task::notifyDependants() {
  112. on_finished_.notify_all();
  113. for (const auto& weak_dep : depends_on_this_) {
  114. auto shared_dep = weak_dep.lock();
  115. if (!shared_dep) {
  116. continue;
  117. }
  118. std::scoped_lock lock(shared_dep->mutex_);
  119. --shared_dep->num_unmet_dependencies_;
  120. shared_dep->checkIfReadyToRunUnlocked();
  121. }
  122. for (const auto& weak_dep : triggered_by_this_) {
  123. auto shared_dep = weak_dep.lock();
  124. if (!shared_dep) {
  125. continue;
  126. }
  127. std::scoped_lock lock(shared_dep->mutex_);
  128. shared_dep->was_triggered_ = true;
  129. shared_dep->checkIfReadyToRunUnlocked();
  130. }
  131. }
  132.  
  133. // Executor
  134.  
  135. bool Executor::TimeTriggeredTaskCmp::operator()(const std::shared_ptr<Task>& lhs,
  136. const std::shared_ptr<Task>& rhs) const {
  137. if (lhs->time_trigger_ != rhs->time_trigger_) {
  138. return lhs->time_trigger_ < rhs->time_trigger_;
  139. }
  140. return lhs.owner_before(rhs);
  141. }
  142.  
  143. Executor::Executor(int num_threads) : shutdown_flag_(false) {
  144. workers_.reserve(num_threads);
  145. while (num_threads--) {
  146. workers_.emplace_back([this] { doWorkerLoop(); });
  147. }
  148. }
  149.  
  150. Executor::~Executor() {
  151. startShutdown();
  152. waitShutdown();
  153. }
  154.  
  155. void Executor::submit(std::shared_ptr<Task> task) {
  156. std::scoped_lock task_lock(task->mutex_);
  157. if (task->weak_executor_.lock()) {
  158. throw std::runtime_error("The task was already submitted");
  159. }
  160. if (task->state_ != Task::State::kCreated) {
  161. return;
  162. }
  163. if (shutdown_flag_) {
  164. task->state_ = Task::State::kCanceled;
  165. } else {
  166. task->weak_executor_ = weak_from_this();
  167. addActiveTask(task);
  168. task->checkIfReadyToRunUnlocked();
  169. if (task->has_time_trigger_) {
  170. addTimeTriggeredTask(task);
  171. }
  172. }
  173. }
  174.  
  175. void Executor::startShutdown() {
  176. std::scoped_lock lock(mutex_);
  177. shutdown_flag_ = true;
  178. on_has_ready_tasks_.notify_all();
  179. }
  180.  
  181. void Executor::waitShutdown() {
  182. startShutdown();
  183. for (auto& worker : workers_) {
  184. worker.join();
  185. }
  186. workers_.clear();
  187. active_tasks_.clear();
  188. while (!ready_to_run_tasks_.empty()) {
  189. ready_to_run_tasks_.pop();
  190. }
  191. time_triggered_tasks_.clear();
  192. }
  193.  
  194. void Executor::doWorkerLoop() {
  195. while (true) {
  196. processTimeTriggeredTasks();
  197. std::shared_ptr<Task> task;
  198. {
  199. std::unique_lock executor_lock(mutex_);
  200. if (time_triggered_tasks_.empty()) {
  201. on_has_ready_tasks_.wait(executor_lock, [this] {
  202. return !ready_to_run_tasks_.empty() || !time_triggered_tasks_.empty() ||
  203. shutdown_flag_;
  204. });
  205. } else {
  206. auto next_event = (*time_triggered_tasks_.begin())->time_trigger_;
  207. on_has_ready_tasks_.wait_until(executor_lock, next_event, [this, next_event] {
  208. return !ready_to_run_tasks_.empty() ||
  209. (!time_triggered_tasks_.empty() &&
  210. (*time_triggered_tasks_.begin())->time_trigger_ < next_event) ||
  211. shutdown_flag_;
  212. });
  213. }
  214. if (shutdown_flag_) {
  215. break;
  216. }
  217. if (ready_to_run_tasks_.empty()) {
  218. continue;
  219. }
  220. task = ready_to_run_tasks_.front();
  221. ready_to_run_tasks_.pop();
  222. }
  223. {
  224. std::unique_lock task_lock(task->mutex_);
  225. if (task->isCanceled()) {
  226. continue;
  227. }
  228. task->state_ = Task::State::kRunning;
  229. try {
  230. task_lock.unlock();
  231. task->run();
  232. task_lock.lock();
  233. } catch (...) {
  234. task_lock.lock();
  235. task->state_ = Task::State::kFailed;
  236. task->thrown_exception_ = std::current_exception();
  237. }
  238. if (task->state_ != Task::State::kFailed) {
  239. task->state_ = Task::State::kCompleted;
  240. }
  241. task->notifyDependants();
  242. removeActiveTask(task);
  243. }
  244. }
  245. }
  246.  
  247. void Executor::processTimeTriggeredTasks() {
  248. std::vector<std::shared_ptr<Task>> tasks_to_trigger;
  249. {
  250. std::scoped_lock executor_lock(mutex_);
  251. auto now = std::chrono::system_clock::now();
  252. while (!time_triggered_tasks_.empty()) {
  253. auto task = *time_triggered_tasks_.begin();
  254. if (task->time_trigger_ > now) {
  255. break;
  256. }
  257. time_triggered_tasks_.erase(time_triggered_tasks_.begin());
  258. tasks_to_trigger.push_back(task);
  259. }
  260. }
  261. for (auto& task : tasks_to_trigger) {
  262. std::scoped_lock task_lock(task->mutex_);
  263. task->was_time_triggered_ = true;
  264. task->checkIfReadyToRunUnlocked();
  265. }
  266. }
  267.  
  268. void Executor::addActiveTask(std::shared_ptr<Task> task) {
  269. std::scoped_lock lock(mutex_);
  270. active_tasks_.insert(task);
  271. }
  272.  
  273. void Executor::removeActiveTask(std::shared_ptr<Task> task) {
  274. std::scoped_lock lock(mutex_);
  275. active_tasks_.erase(task);
  276. }
  277.  
  278. void Executor::addReadyToRunTask(std::shared_ptr<Task> task) {
  279. std::scoped_lock lock(mutex_);
  280. ready_to_run_tasks_.push(task);
  281. on_has_ready_tasks_.notify_one();
  282. }
  283.  
  284. void Executor::addTimeTriggeredTask(std::shared_ptr<Task> task) {
  285. std::scoped_lock lock(mutex_);
  286. time_triggered_tasks_.insert(task);
  287. on_has_ready_tasks_.notify_all();
  288. }
  289.  
  290. void Executor::removeTimeTriggeredTask(std::shared_ptr<Task> task) {
  291. std::scoped_lock lock(mutex_);
  292. time_triggered_tasks_.erase(task);
  293. }
  294.  
  295. std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads) {
  296. return std::make_shared<Executor>(num_threads);
  297. }
  298.  
  299.  
  300.  
  301.  
  302.  
  303. #include <atomic>
  304. #include <chrono>
  305. #include <condition_variable>
  306. #include <functional>
  307. #include <memory>
  308. #include <mutex>
  309. #include <queue>
  310. #include <set>
  311. #include <thread>
  312. #include <unordered_set>
  313. #include <vector>
  314.  
  315. class Executor;
  316.  
  317. class Task : public std::enable_shared_from_this<Task> {
  318. public:
  319. Task();
  320. virtual ~Task() {
  321. }
  322.  
  323. virtual void run() = 0;
  324.  
  325. void addDependency(std::shared_ptr<Task> dep);
  326. void addTrigger(std::shared_ptr<Task> dep);
  327. void setTimeTrigger(std::chrono::system_clock::time_point at);
  328.  
  329. // Task::run() completed without throwing exception
  330. bool isCompleted() const;
  331.  
  332. // Task::run() throwed exception
  333. bool isFailed() const;
  334.  
  335. // Task was canceled
  336. bool isCanceled() const;
  337.  
  338. // Task either completed, failed or was canceled
  339. bool isFinished() const;
  340.  
  341. std::exception_ptr getError() const;
  342.  
  343. void cancel();
  344.  
  345. void wait();
  346.  
  347. private:
  348. void checkIfReadyToRunUnlocked();
  349. void notifyDependants();
  350.  
  351. private:
  352. enum class State { kCreated, kReadyToRun, kRunning, kCanceled, kFailed, kCompleted };
  353.  
  354. std::mutex mutex_;
  355. std::condition_variable on_finished_;
  356. std::weak_ptr<Executor> weak_executor_;
  357. std::atomic<State> state_;
  358. std::exception_ptr thrown_exception_;
  359. bool has_dependencies_;
  360. int num_unmet_dependencies_;
  361. std::vector<std::weak_ptr<Task>> depends_on_this_;
  362. bool has_triggers_;
  363. bool was_triggered_;
  364. std::vector<std::weak_ptr<Task>> triggered_by_this_;
  365. bool has_time_trigger_;
  366. bool was_time_triggered_;
  367. std::chrono::system_clock::time_point time_trigger_;
  368.  
  369. friend class Executor;
  370. };
  371.  
  372. template <class T>
  373. class Future;
  374.  
  375. template <class T>
  376. using FuturePtr = std::shared_ptr<Future<T>>;
  377.  
  378. // Used instead of void in generic code
  379. struct Unit {};
  380.  
  381. class Executor : public std::enable_shared_from_this<Executor> {
  382. public:
  383. explicit Executor(int num_threads);
  384. virtual ~Executor();
  385.  
  386. virtual void submit(std::shared_ptr<Task> task);
  387.  
  388. virtual void startShutdown();
  389. virtual void waitShutdown();
  390.  
  391. template <class T>
  392. FuturePtr<T> invoke(std::function<T()> fn);
  393.  
  394. template <class Y, class T>
  395. FuturePtr<Y> then(FuturePtr<T> input, std::function<Y()> fn);
  396.  
  397. template <class T>
  398. FuturePtr<std::vector<T>> whenAll(std::vector<FuturePtr<T>> all);
  399.  
  400. template <class T>
  401. FuturePtr<T> whenFirst(std::vector<FuturePtr<T>> all);
  402.  
  403. template <class T>
  404. FuturePtr<std::vector<T>> whenAllBeforeDeadline(std::vector<FuturePtr<T>> all,
  405. std::chrono::system_clock::time_point deadline);
  406.  
  407. private:
  408. void doWorkerLoop();
  409. void processTimeTriggeredTasks();
  410. void addActiveTask(std::shared_ptr<Task> task);
  411. void removeActiveTask(std::shared_ptr<Task> task);
  412. void addTimeTriggeredTask(std::shared_ptr<Task> task);
  413. void removeTimeTriggeredTask(std::shared_ptr<Task> task);
  414. void addReadyToRunTask(std::shared_ptr<Task> task);
  415.  
  416. private:
  417. struct TimeTriggeredTaskCmp {
  418. bool operator()(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>& rhs) const;
  419. };
  420.  
  421. std::vector<std::thread> workers_;
  422. std::mutex mutex_;
  423. std::atomic<bool> shutdown_flag_;
  424. std::condition_variable on_has_ready_tasks_;
  425. std::unordered_set<std::shared_ptr<Task>> active_tasks_;
  426. std::queue<std::shared_ptr<Task>> ready_to_run_tasks_;
  427. std::set<std::shared_ptr<Task>, TimeTriggeredTaskCmp> time_triggered_tasks_;
  428.  
  429. friend class Task;
  430.  
  431. template <class T>
  432. friend class Future;
  433. };
  434.  
  435. std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads);
  436.  
  437. template <class T>
  438. class Future : public Task {
  439. public:
  440. explicit Future(const std::function<T()>& fn) : function_(fn) {
  441. if (!fn) {
  442. throw std::runtime_error("The function must not be null");
  443. }
  444. }
  445.  
  446. void run() {
  447. result_ = function_();
  448. }
  449.  
  450. T get() {
  451. wait();
  452. if (isCanceled()) {
  453. throw std::runtime_error("The task was canceled");
  454. }
  455. if (isFailed()) {
  456. std::rethrow_exception(getError());
  457. }
  458. return result_;
  459. }
  460.  
  461. private:
  462. std::function<T()> function_;
  463. T result_;
  464. };
  465.  
  466. template <class T>
  467. FuturePtr<T> Executor::invoke(std::function<T()> fn) {
  468. auto future = std::make_shared<Future<T>>(fn);
  469. submit(future);
  470. return future;
  471. }
  472.  
  473. template <class Y, class T>
  474. FuturePtr<Y> Executor::then(FuturePtr<T> input, std::function<Y()> fn) {
  475. auto future = std::make_shared<Future<Y>>(fn);
  476. future->addTrigger(input);
  477. submit(future);
  478. return future;
  479. }
  480.  
  481. template <class T>
  482. FuturePtr<std::vector<T>> Executor::whenAll(std::vector<FuturePtr<T>> all) {
  483. auto future = std::make_shared<Future<std::vector<T>>>([&all] {
  484. std::vector<T> results;
  485. results.reserve(all.size());
  486. for (auto& dep : all) {
  487. results.push_back(dep->get());
  488. }
  489. return results;
  490. });
  491. for (auto& dep : all) {
  492. future->addDependency(dep);
  493. }
  494. submit(future);
  495. return future;
  496. }
  497.  
  498. template <class T>
  499. FuturePtr<T> Executor::whenFirst(std::vector<FuturePtr<T>> all) {
  500. auto future = std::make_shared<Future<T>>([&all] {
  501. for (auto& dep : all) {
  502. if (dep->isFinished()) {
  503. return dep->get();
  504. }
  505. }
  506. throw std::runtime_error("Internal error");
  507. });
  508. for (auto& dep : all) {
  509. future->addTrigger(dep);
  510. }
  511. submit(future);
  512. return future;
  513. }
  514.  
  515. template <class T>
  516. FuturePtr<std::vector<T>> Executor::whenAllBeforeDeadline(
  517. std::vector<FuturePtr<T>> all, std::chrono::system_clock::time_point deadline) {
  518. auto future = std::make_shared<Future<std::vector<T>>>([&all] {
  519. std::vector<T> results;
  520. for (auto& dep : all) {
  521. if (dep->isFinished()) {
  522. results.push_back(dep->get());
  523. }
  524. }
  525. return results;
  526. });
  527. future->setTimeTrigger(deadline);
  528. submit(future);
  529. return future;
  530. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement