SHARE
TWEET

Untitled

a guest Apr 26th, 2019 54 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <executors/executors.h>
  2.  
  3. // Task
  4.  
  5. Task::Task()
  6.     : state_(Task::State::kCreated),
  7.       has_dependencies_(false),
  8.       num_unmet_dependencies_(0),
  9.       has_triggers_(false),
  10.       was_triggered_(false),
  11.       has_time_trigger_(false),
  12.       was_time_triggered_(false) {
  13. }
  14.  
  15. void Task::addDependency(std::shared_ptr<Task> dep) {
  16.     std::scoped_lock lock(mutex_, dep->mutex_);
  17.     if (state_ != Task::State::kCreated) {
  18.         throw std::runtime_error("Cannot configure the task");
  19.     }
  20.     has_dependencies_ = true;
  21.     if (!dep->isFinished()) {
  22.         ++num_unmet_dependencies_;
  23.     } else {
  24.         checkIfReadyToRunUnlocked();
  25.     }
  26.     dep->depends_on_this_.push_back(weak_from_this());
  27. }
  28.  
  29. void Task::addTrigger(std::shared_ptr<Task> dep) {
  30.     std::scoped_lock lock(mutex_, dep->mutex_);
  31.     if (state_ != Task::State::kCreated) {
  32.         throw std::runtime_error("Cannot configure the task");
  33.     }
  34.     has_triggers_ = true;
  35.     if (dep->isFinished()) {
  36.         was_triggered_ = true;
  37.         checkIfReadyToRunUnlocked();
  38.     }
  39.     dep->triggered_by_this_.push_back(weak_from_this());
  40. }
  41.  
  42. void Task::setTimeTrigger(std::chrono::system_clock::time_point at) {
  43.     std::scoped_lock lock(mutex_);
  44.     if (state_ != Task::State::kCreated || has_time_trigger_) {
  45.         throw std::runtime_error("Cannot configure the task");
  46.     }
  47.     has_time_trigger_ = true;
  48.     time_trigger_ = at;
  49.     auto shared_executor = weak_executor_.lock();
  50.     if (shared_executor) {
  51.         shared_executor->addTimeTriggeredTask(shared_from_this());
  52.     }
  53. }
  54.  
  55. bool Task::isCompleted() const {
  56.     return state_ == Task::State::kCompleted;
  57. }
  58.  
  59. bool Task::isFailed() const {
  60.     return state_ == Task::State::kFailed;
  61. }
  62.  
  63. bool Task::isCanceled() const {
  64.     return state_ == Task::State::kCanceled;
  65. }
  66.  
  67. bool Task::isFinished() const {
  68.     return isCompleted() || isFailed() || isCanceled();
  69. }
  70.  
  71. std::exception_ptr Task::getError() const {
  72.     return thrown_exception_;
  73. }
  74.  
  75. void Task::cancel() {
  76.     std::scoped_lock lock(mutex_);
  77.     if (isCanceled()) {
  78.         return;
  79.     }
  80.     state_ = Task::State::kCanceled;
  81.     notifyDependants();
  82.     auto shared_executor = weak_executor_.lock();
  83.     if (!shared_executor) {
  84.         return;
  85.     }
  86.     auto shared_this = shared_from_this();
  87.     shared_executor->removeActiveTask(shared_this);
  88.     if (has_time_trigger_) {
  89.         shared_executor->removeTimeTriggeredTask(shared_this);
  90.     }
  91. }
  92.  
  93. void Task::wait() {
  94.     std::unique_lock lock(mutex_);
  95.     on_finished_.wait(lock, [this] { return isFinished(); });
  96. }
  97.  
  98. void Task::checkIfReadyToRunUnlocked() {
  99.     auto shared_executor = weak_executor_.lock();
  100.     if (!shared_executor || state_ != Task::State::kCreated) {
  101.         return;
  102.     }
  103.     if ((has_dependencies_ && num_unmet_dependencies_ == 0) || (has_triggers_ && was_triggered_) ||
  104.         (has_time_trigger_ && was_time_triggered_) ||
  105.         (!has_dependencies_ && !has_triggers_ && !has_time_trigger_)) {
  106.         state_ = Task::State::kReadyToRun;
  107.         shared_executor->addReadyToRunTask(shared_from_this());
  108.     }
  109. }
  110.  
  111. void Task::notifyDependants() {
  112.     on_finished_.notify_all();
  113.     for (const auto& weak_dep : depends_on_this_) {
  114.         auto shared_dep = weak_dep.lock();
  115.         if (!shared_dep) {
  116.             continue;
  117.         }
  118.         std::scoped_lock lock(shared_dep->mutex_);
  119.         --shared_dep->num_unmet_dependencies_;
  120.         shared_dep->checkIfReadyToRunUnlocked();
  121.     }
  122.     for (const auto& weak_dep : triggered_by_this_) {
  123.         auto shared_dep = weak_dep.lock();
  124.         if (!shared_dep) {
  125.             continue;
  126.         }
  127.         std::scoped_lock lock(shared_dep->mutex_);
  128.         shared_dep->was_triggered_ = true;
  129.         shared_dep->checkIfReadyToRunUnlocked();
  130.     }
  131. }
  132.  
  133. // Executor
  134.  
  135. bool Executor::TimeTriggeredTaskCmp::operator()(const std::shared_ptr<Task>& lhs,
  136.                                                 const std::shared_ptr<Task>& rhs) const {
  137.     if (lhs->time_trigger_ != rhs->time_trigger_) {
  138.         return lhs->time_trigger_ < rhs->time_trigger_;
  139.     }
  140.     return lhs.owner_before(rhs);
  141. }
  142.  
  143. Executor::Executor(int num_threads) : shutdown_flag_(false) {
  144.     workers_.reserve(num_threads);
  145.     while (num_threads--) {
  146.         workers_.emplace_back([this] { doWorkerLoop(); });
  147.     }
  148. }
  149.  
  150. Executor::~Executor() {
  151.     startShutdown();
  152.     waitShutdown();
  153. }
  154.  
  155. void Executor::submit(std::shared_ptr<Task> task) {
  156.     std::scoped_lock task_lock(task->mutex_);
  157.     if (task->weak_executor_.lock()) {
  158.         throw std::runtime_error("The task was already submitted");
  159.     }
  160.     if (task->state_ != Task::State::kCreated) {
  161.         return;
  162.     }
  163.     if (shutdown_flag_) {
  164.         task->state_ = Task::State::kCanceled;
  165.     } else {
  166.         task->weak_executor_ = weak_from_this();
  167.         addActiveTask(task);
  168.         task->checkIfReadyToRunUnlocked();
  169.         if (task->has_time_trigger_) {
  170.             addTimeTriggeredTask(task);
  171.         }
  172.     }
  173. }
  174.  
  175. void Executor::startShutdown() {
  176.     std::scoped_lock lock(mutex_);
  177.     shutdown_flag_ = true;
  178.     on_has_ready_tasks_.notify_all();
  179. }
  180.  
  181. void Executor::waitShutdown() {
  182.     startShutdown();
  183.     for (auto& worker : workers_) {
  184.         worker.join();
  185.     }
  186.     workers_.clear();
  187.     active_tasks_.clear();
  188.     while (!ready_to_run_tasks_.empty()) {
  189.         ready_to_run_tasks_.pop();
  190.     }
  191.     time_triggered_tasks_.clear();
  192. }
  193.  
  194. void Executor::doWorkerLoop() {
  195.     while (true) {
  196.         processTimeTriggeredTasks();
  197.         std::shared_ptr<Task> task;
  198.         {
  199.             std::unique_lock executor_lock(mutex_);
  200.             if (time_triggered_tasks_.empty()) {
  201.                 on_has_ready_tasks_.wait(executor_lock, [this] {
  202.                     return !ready_to_run_tasks_.empty() || !time_triggered_tasks_.empty() ||
  203.                            shutdown_flag_;
  204.                 });
  205.             } else {
  206.                 auto next_event = (*time_triggered_tasks_.begin())->time_trigger_;
  207.                 on_has_ready_tasks_.wait_until(executor_lock, next_event, [this, next_event] {
  208.                     return !ready_to_run_tasks_.empty() ||
  209.                            (!time_triggered_tasks_.empty() &&
  210.                             (*time_triggered_tasks_.begin())->time_trigger_ < next_event) ||
  211.                            shutdown_flag_;
  212.                 });
  213.             }
  214.             if (shutdown_flag_) {
  215.                 break;
  216.             }
  217.             if (ready_to_run_tasks_.empty()) {
  218.                 continue;
  219.             }
  220.             task = ready_to_run_tasks_.front();
  221.             ready_to_run_tasks_.pop();
  222.         }
  223.         {
  224.             std::unique_lock task_lock(task->mutex_);
  225.             if (task->isCanceled()) {
  226.                 continue;
  227.             }
  228.             task->state_ = Task::State::kRunning;
  229.             try {
  230.                 task_lock.unlock();
  231.                 task->run();
  232.                 task_lock.lock();
  233.             } catch (...) {
  234.                 task_lock.lock();
  235.                 task->state_ = Task::State::kFailed;
  236.                 task->thrown_exception_ = std::current_exception();
  237.             }
  238.             if (task->state_ != Task::State::kFailed) {
  239.                 task->state_ = Task::State::kCompleted;
  240.             }
  241.             task->notifyDependants();
  242.             removeActiveTask(task);
  243.         }
  244.     }
  245. }
  246.  
  247. void Executor::processTimeTriggeredTasks() {
  248.     std::vector<std::shared_ptr<Task>> tasks_to_trigger;
  249.     {
  250.         std::scoped_lock executor_lock(mutex_);
  251.         auto now = std::chrono::system_clock::now();
  252.         while (!time_triggered_tasks_.empty()) {
  253.             auto task = *time_triggered_tasks_.begin();
  254.             if (task->time_trigger_ > now) {
  255.                 break;
  256.             }
  257.             time_triggered_tasks_.erase(time_triggered_tasks_.begin());
  258.             tasks_to_trigger.push_back(task);
  259.         }
  260.     }
  261.     for (auto& task : tasks_to_trigger) {
  262.         std::scoped_lock task_lock(task->mutex_);
  263.         task->was_time_triggered_ = true;
  264.         task->checkIfReadyToRunUnlocked();
  265.     }
  266. }
  267.  
  268. void Executor::addActiveTask(std::shared_ptr<Task> task) {
  269.     std::scoped_lock lock(mutex_);
  270.     active_tasks_.insert(task);
  271. }
  272.  
  273. void Executor::removeActiveTask(std::shared_ptr<Task> task) {
  274.     std::scoped_lock lock(mutex_);
  275.     active_tasks_.erase(task);
  276. }
  277.  
  278. void Executor::addReadyToRunTask(std::shared_ptr<Task> task) {
  279.     std::scoped_lock lock(mutex_);
  280.     ready_to_run_tasks_.push(task);
  281.     on_has_ready_tasks_.notify_one();
  282. }
  283.  
  284. void Executor::addTimeTriggeredTask(std::shared_ptr<Task> task) {
  285.     std::scoped_lock lock(mutex_);
  286.     time_triggered_tasks_.insert(task);
  287.     on_has_ready_tasks_.notify_all();
  288. }
  289.  
  290. void Executor::removeTimeTriggeredTask(std::shared_ptr<Task> task) {
  291.     std::scoped_lock lock(mutex_);
  292.     time_triggered_tasks_.erase(task);
  293. }
  294.  
  295. std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads) {
  296.     return std::make_shared<Executor>(num_threads);
  297. }
  298.  
  299.  
  300.  
  301.  
  302.  
  303. #include <atomic>
  304. #include <chrono>
  305. #include <condition_variable>
  306. #include <functional>
  307. #include <memory>
  308. #include <mutex>
  309. #include <queue>
  310. #include <set>
  311. #include <thread>
  312. #include <unordered_set>
  313. #include <vector>
  314.  
  315. class Executor;
  316.  
  317. class Task : public std::enable_shared_from_this<Task> {
  318. public:
  319.     Task();
  320.     virtual ~Task() {
  321.     }
  322.  
  323.     virtual void run() = 0;
  324.  
  325.     void addDependency(std::shared_ptr<Task> dep);
  326.     void addTrigger(std::shared_ptr<Task> dep);
  327.     void setTimeTrigger(std::chrono::system_clock::time_point at);
  328.  
  329.     // Task::run() completed without throwing exception
  330.     bool isCompleted() const;
  331.  
  332.     // Task::run() throwed exception
  333.     bool isFailed() const;
  334.  
  335.     // Task was canceled
  336.     bool isCanceled() const;
  337.  
  338.     // Task either completed, failed or was canceled
  339.     bool isFinished() const;
  340.  
  341.     std::exception_ptr getError() const;
  342.  
  343.     void cancel();
  344.  
  345.     void wait();
  346.  
  347. private:
  348.     void checkIfReadyToRunUnlocked();
  349.     void notifyDependants();
  350.  
  351. private:
  352.     enum class State { kCreated, kReadyToRun, kRunning, kCanceled, kFailed, kCompleted };
  353.  
  354.     std::mutex mutex_;
  355.     std::condition_variable on_finished_;
  356.     std::weak_ptr<Executor> weak_executor_;
  357.     std::atomic<State> state_;
  358.     std::exception_ptr thrown_exception_;
  359.     bool has_dependencies_;
  360.     int num_unmet_dependencies_;
  361.     std::vector<std::weak_ptr<Task>> depends_on_this_;
  362.     bool has_triggers_;
  363.     bool was_triggered_;
  364.     std::vector<std::weak_ptr<Task>> triggered_by_this_;
  365.     bool has_time_trigger_;
  366.     bool was_time_triggered_;
  367.     std::chrono::system_clock::time_point time_trigger_;
  368.  
  369.     friend class Executor;
  370. };
  371.  
  372. template <class T>
  373. class Future;
  374.  
  375. template <class T>
  376. using FuturePtr = std::shared_ptr<Future<T>>;
  377.  
  378. // Used instead of void in generic code
  379. struct Unit {};
  380.  
  381. class Executor : public std::enable_shared_from_this<Executor> {
  382. public:
  383.     explicit Executor(int num_threads);
  384.     virtual ~Executor();
  385.  
  386.     virtual void submit(std::shared_ptr<Task> task);
  387.  
  388.     virtual void startShutdown();
  389.     virtual void waitShutdown();
  390.  
  391.     template <class T>
  392.     FuturePtr<T> invoke(std::function<T()> fn);
  393.  
  394.     template <class Y, class T>
  395.     FuturePtr<Y> then(FuturePtr<T> input, std::function<Y()> fn);
  396.  
  397.     template <class T>
  398.     FuturePtr<std::vector<T>> whenAll(std::vector<FuturePtr<T>> all);
  399.  
  400.     template <class T>
  401.     FuturePtr<T> whenFirst(std::vector<FuturePtr<T>> all);
  402.  
  403.     template <class T>
  404.     FuturePtr<std::vector<T>> whenAllBeforeDeadline(std::vector<FuturePtr<T>> all,
  405.                                                     std::chrono::system_clock::time_point deadline);
  406.  
  407. private:
  408.     void doWorkerLoop();
  409.     void processTimeTriggeredTasks();
  410.     void addActiveTask(std::shared_ptr<Task> task);
  411.     void removeActiveTask(std::shared_ptr<Task> task);
  412.     void addTimeTriggeredTask(std::shared_ptr<Task> task);
  413.     void removeTimeTriggeredTask(std::shared_ptr<Task> task);
  414.     void addReadyToRunTask(std::shared_ptr<Task> task);
  415.  
  416. private:
  417.     struct TimeTriggeredTaskCmp {
  418.         bool operator()(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>& rhs) const;
  419.     };
  420.  
  421.     std::vector<std::thread> workers_;
  422.     std::mutex mutex_;
  423.     std::atomic<bool> shutdown_flag_;
  424.     std::condition_variable on_has_ready_tasks_;
  425.     std::unordered_set<std::shared_ptr<Task>> active_tasks_;
  426.     std::queue<std::shared_ptr<Task>> ready_to_run_tasks_;
  427.     std::set<std::shared_ptr<Task>, TimeTriggeredTaskCmp> time_triggered_tasks_;
  428.  
  429.     friend class Task;
  430.  
  431.     template <class T>
  432.     friend class Future;
  433. };
  434.  
  435. std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads);
  436.  
  437. template <class T>
  438. class Future : public Task {
  439. public:
  440.     explicit Future(const std::function<T()>& fn) : function_(fn) {
  441.         if (!fn) {
  442.             throw std::runtime_error("The function must not be null");
  443.         }
  444.     }
  445.  
  446.     void run() {
  447.         result_ = function_();
  448.     }
  449.  
  450.     T get() {
  451.         wait();
  452.         if (isCanceled()) {
  453.             throw std::runtime_error("The task was canceled");
  454.         }
  455.         if (isFailed()) {
  456.             std::rethrow_exception(getError());
  457.         }
  458.         return result_;
  459.     }
  460.  
  461. private:
  462.     std::function<T()> function_;
  463.     T result_;
  464. };
  465.  
  466. template <class T>
  467. FuturePtr<T> Executor::invoke(std::function<T()> fn) {
  468.     auto future = std::make_shared<Future<T>>(fn);
  469.     submit(future);
  470.     return future;
  471. }
  472.  
  473. template <class Y, class T>
  474. FuturePtr<Y> Executor::then(FuturePtr<T> input, std::function<Y()> fn) {
  475.     auto future = std::make_shared<Future<Y>>(fn);
  476.     future->addTrigger(input);
  477.     submit(future);
  478.     return future;
  479. }
  480.  
  481. template <class T>
  482. FuturePtr<std::vector<T>> Executor::whenAll(std::vector<FuturePtr<T>> all) {
  483.     auto future = std::make_shared<Future<std::vector<T>>>([&all] {
  484.         std::vector<T> results;
  485.         results.reserve(all.size());
  486.         for (auto& dep : all) {
  487.             results.push_back(dep->get());
  488.         }
  489.         return results;
  490.     });
  491.     for (auto& dep : all) {
  492.         future->addDependency(dep);
  493.     }
  494.     submit(future);
  495.     return future;
  496. }
  497.  
  498. template <class T>
  499. FuturePtr<T> Executor::whenFirst(std::vector<FuturePtr<T>> all) {
  500.     auto future = std::make_shared<Future<T>>([&all] {
  501.         for (auto& dep : all) {
  502.             if (dep->isFinished()) {
  503.                 return dep->get();
  504.             }
  505.         }
  506.         throw std::runtime_error("Internal error");
  507.     });
  508.     for (auto& dep : all) {
  509.         future->addTrigger(dep);
  510.     }
  511.     submit(future);
  512.     return future;
  513. }
  514.  
  515. template <class T>
  516. FuturePtr<std::vector<T>> Executor::whenAllBeforeDeadline(
  517.     std::vector<FuturePtr<T>> all, std::chrono::system_clock::time_point deadline) {
  518.     auto future = std::make_shared<Future<std::vector<T>>>([&all] {
  519.         std::vector<T> results;
  520.         for (auto& dep : all) {
  521.             if (dep->isFinished()) {
  522.                 results.push_back(dep->get());
  523.             }
  524.         }
  525.         return results;
  526.     });
  527.     future->setTimeTrigger(deadline);
  528.     submit(future);
  529.     return future;
  530. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top