Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <executors/executors.h>
- // Task
- Task::Task()
- : state_(Task::State::kCreated),
- has_dependencies_(false),
- num_unmet_dependencies_(0),
- has_triggers_(false),
- was_triggered_(false),
- has_time_trigger_(false),
- was_time_triggered_(false) {
- }
- void Task::addDependency(std::shared_ptr<Task> dep) {
- std::scoped_lock lock(mutex_, dep->mutex_);
- if (state_ != Task::State::kCreated) {
- throw std::runtime_error("Cannot configure the task");
- }
- has_dependencies_ = true;
- if (!dep->isFinished()) {
- ++num_unmet_dependencies_;
- } else {
- checkIfReadyToRunUnlocked();
- }
- dep->depends_on_this_.push_back(weak_from_this());
- }
- void Task::addTrigger(std::shared_ptr<Task> dep) {
- std::scoped_lock lock(mutex_, dep->mutex_);
- if (state_ != Task::State::kCreated) {
- throw std::runtime_error("Cannot configure the task");
- }
- has_triggers_ = true;
- if (dep->isFinished()) {
- was_triggered_ = true;
- checkIfReadyToRunUnlocked();
- }
- dep->triggered_by_this_.push_back(weak_from_this());
- }
- void Task::setTimeTrigger(std::chrono::system_clock::time_point at) {
- std::scoped_lock lock(mutex_);
- if (state_ != Task::State::kCreated || has_time_trigger_) {
- throw std::runtime_error("Cannot configure the task");
- }
- has_time_trigger_ = true;
- time_trigger_ = at;
- auto shared_executor = weak_executor_.lock();
- if (shared_executor) {
- shared_executor->addTimeTriggeredTask(shared_from_this());
- }
- }
- bool Task::isCompleted() const {
- return state_ == Task::State::kCompleted;
- }
- bool Task::isFailed() const {
- return state_ == Task::State::kFailed;
- }
- bool Task::isCanceled() const {
- return state_ == Task::State::kCanceled;
- }
- bool Task::isFinished() const {
- return isCompleted() || isFailed() || isCanceled();
- }
- std::exception_ptr Task::getError() const {
- return thrown_exception_;
- }
- void Task::cancel() {
- std::scoped_lock lock(mutex_);
- if (isCanceled()) {
- return;
- }
- state_ = Task::State::kCanceled;
- notifyDependants();
- auto shared_executor = weak_executor_.lock();
- if (!shared_executor) {
- return;
- }
- auto shared_this = shared_from_this();
- shared_executor->removeActiveTask(shared_this);
- if (has_time_trigger_) {
- shared_executor->removeTimeTriggeredTask(shared_this);
- }
- }
- void Task::wait() {
- std::unique_lock lock(mutex_);
- on_finished_.wait(lock, [this] { return isFinished(); });
- }
- void Task::checkIfReadyToRunUnlocked() {
- auto shared_executor = weak_executor_.lock();
- if (!shared_executor || state_ != Task::State::kCreated) {
- return;
- }
- if ((has_dependencies_ && num_unmet_dependencies_ == 0) || (has_triggers_ && was_triggered_) ||
- (has_time_trigger_ && was_time_triggered_) ||
- (!has_dependencies_ && !has_triggers_ && !has_time_trigger_)) {
- state_ = Task::State::kReadyToRun;
- shared_executor->addReadyToRunTask(shared_from_this());
- }
- }
- void Task::notifyDependants() {
- on_finished_.notify_all();
- for (const auto& weak_dep : depends_on_this_) {
- auto shared_dep = weak_dep.lock();
- if (!shared_dep) {
- continue;
- }
- std::scoped_lock lock(shared_dep->mutex_);
- --shared_dep->num_unmet_dependencies_;
- shared_dep->checkIfReadyToRunUnlocked();
- }
- for (const auto& weak_dep : triggered_by_this_) {
- auto shared_dep = weak_dep.lock();
- if (!shared_dep) {
- continue;
- }
- std::scoped_lock lock(shared_dep->mutex_);
- shared_dep->was_triggered_ = true;
- shared_dep->checkIfReadyToRunUnlocked();
- }
- }
- // Executor
- bool Executor::TimeTriggeredTaskCmp::operator()(const std::shared_ptr<Task>& lhs,
- const std::shared_ptr<Task>& rhs) const {
- if (lhs->time_trigger_ != rhs->time_trigger_) {
- return lhs->time_trigger_ < rhs->time_trigger_;
- }
- return lhs.owner_before(rhs);
- }
- Executor::Executor(int num_threads) : shutdown_flag_(false) {
- workers_.reserve(num_threads);
- while (num_threads--) {
- workers_.emplace_back([this] { doWorkerLoop(); });
- }
- }
- Executor::~Executor() {
- startShutdown();
- waitShutdown();
- }
- void Executor::submit(std::shared_ptr<Task> task) {
- std::scoped_lock task_lock(task->mutex_);
- if (task->weak_executor_.lock()) {
- throw std::runtime_error("The task was already submitted");
- }
- if (task->state_ != Task::State::kCreated) {
- return;
- }
- if (shutdown_flag_) {
- task->state_ = Task::State::kCanceled;
- } else {
- task->weak_executor_ = weak_from_this();
- addActiveTask(task);
- task->checkIfReadyToRunUnlocked();
- if (task->has_time_trigger_) {
- addTimeTriggeredTask(task);
- }
- }
- }
- void Executor::startShutdown() {
- std::scoped_lock lock(mutex_);
- shutdown_flag_ = true;
- on_has_ready_tasks_.notify_all();
- }
- void Executor::waitShutdown() {
- startShutdown();
- for (auto& worker : workers_) {
- worker.join();
- }
- workers_.clear();
- active_tasks_.clear();
- while (!ready_to_run_tasks_.empty()) {
- ready_to_run_tasks_.pop();
- }
- time_triggered_tasks_.clear();
- }
- void Executor::doWorkerLoop() {
- while (true) {
- processTimeTriggeredTasks();
- std::shared_ptr<Task> task;
- {
- std::unique_lock executor_lock(mutex_);
- if (time_triggered_tasks_.empty()) {
- on_has_ready_tasks_.wait(executor_lock, [this] {
- return !ready_to_run_tasks_.empty() || !time_triggered_tasks_.empty() ||
- shutdown_flag_;
- });
- } else {
- auto next_event = (*time_triggered_tasks_.begin())->time_trigger_;
- on_has_ready_tasks_.wait_until(executor_lock, next_event, [this, next_event] {
- return !ready_to_run_tasks_.empty() ||
- (!time_triggered_tasks_.empty() &&
- (*time_triggered_tasks_.begin())->time_trigger_ < next_event) ||
- shutdown_flag_;
- });
- }
- if (shutdown_flag_) {
- break;
- }
- if (ready_to_run_tasks_.empty()) {
- continue;
- }
- task = ready_to_run_tasks_.front();
- ready_to_run_tasks_.pop();
- }
- {
- std::unique_lock task_lock(task->mutex_);
- if (task->isCanceled()) {
- continue;
- }
- task->state_ = Task::State::kRunning;
- try {
- task_lock.unlock();
- task->run();
- task_lock.lock();
- } catch (...) {
- task_lock.lock();
- task->state_ = Task::State::kFailed;
- task->thrown_exception_ = std::current_exception();
- }
- if (task->state_ != Task::State::kFailed) {
- task->state_ = Task::State::kCompleted;
- }
- task->notifyDependants();
- removeActiveTask(task);
- }
- }
- }
- void Executor::processTimeTriggeredTasks() {
- std::vector<std::shared_ptr<Task>> tasks_to_trigger;
- {
- std::scoped_lock executor_lock(mutex_);
- auto now = std::chrono::system_clock::now();
- while (!time_triggered_tasks_.empty()) {
- auto task = *time_triggered_tasks_.begin();
- if (task->time_trigger_ > now) {
- break;
- }
- time_triggered_tasks_.erase(time_triggered_tasks_.begin());
- tasks_to_trigger.push_back(task);
- }
- }
- for (auto& task : tasks_to_trigger) {
- std::scoped_lock task_lock(task->mutex_);
- task->was_time_triggered_ = true;
- task->checkIfReadyToRunUnlocked();
- }
- }
- void Executor::addActiveTask(std::shared_ptr<Task> task) {
- std::scoped_lock lock(mutex_);
- active_tasks_.insert(task);
- }
- void Executor::removeActiveTask(std::shared_ptr<Task> task) {
- std::scoped_lock lock(mutex_);
- active_tasks_.erase(task);
- }
- void Executor::addReadyToRunTask(std::shared_ptr<Task> task) {
- std::scoped_lock lock(mutex_);
- ready_to_run_tasks_.push(task);
- on_has_ready_tasks_.notify_one();
- }
- void Executor::addTimeTriggeredTask(std::shared_ptr<Task> task) {
- std::scoped_lock lock(mutex_);
- time_triggered_tasks_.insert(task);
- on_has_ready_tasks_.notify_all();
- }
- void Executor::removeTimeTriggeredTask(std::shared_ptr<Task> task) {
- std::scoped_lock lock(mutex_);
- time_triggered_tasks_.erase(task);
- }
- std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads) {
- return std::make_shared<Executor>(num_threads);
- }
- #include <atomic>
- #include <chrono>
- #include <condition_variable>
- #include <functional>
- #include <memory>
- #include <mutex>
- #include <queue>
- #include <set>
- #include <thread>
- #include <unordered_set>
- #include <vector>
- class Executor;
- class Task : public std::enable_shared_from_this<Task> {
- public:
- Task();
- virtual ~Task() {
- }
- virtual void run() = 0;
- void addDependency(std::shared_ptr<Task> dep);
- void addTrigger(std::shared_ptr<Task> dep);
- void setTimeTrigger(std::chrono::system_clock::time_point at);
- // Task::run() completed without throwing exception
- bool isCompleted() const;
- // Task::run() throwed exception
- bool isFailed() const;
- // Task was canceled
- bool isCanceled() const;
- // Task either completed, failed or was canceled
- bool isFinished() const;
- std::exception_ptr getError() const;
- void cancel();
- void wait();
- private:
- void checkIfReadyToRunUnlocked();
- void notifyDependants();
- private:
- enum class State { kCreated, kReadyToRun, kRunning, kCanceled, kFailed, kCompleted };
- std::mutex mutex_;
- std::condition_variable on_finished_;
- std::weak_ptr<Executor> weak_executor_;
- std::atomic<State> state_;
- std::exception_ptr thrown_exception_;
- bool has_dependencies_;
- int num_unmet_dependencies_;
- std::vector<std::weak_ptr<Task>> depends_on_this_;
- bool has_triggers_;
- bool was_triggered_;
- std::vector<std::weak_ptr<Task>> triggered_by_this_;
- bool has_time_trigger_;
- bool was_time_triggered_;
- std::chrono::system_clock::time_point time_trigger_;
- friend class Executor;
- };
- template <class T>
- class Future;
- template <class T>
- using FuturePtr = std::shared_ptr<Future<T>>;
- // Used instead of void in generic code
- struct Unit {};
- class Executor : public std::enable_shared_from_this<Executor> {
- public:
- explicit Executor(int num_threads);
- virtual ~Executor();
- virtual void submit(std::shared_ptr<Task> task);
- virtual void startShutdown();
- virtual void waitShutdown();
- template <class T>
- FuturePtr<T> invoke(std::function<T()> fn);
- template <class Y, class T>
- FuturePtr<Y> then(FuturePtr<T> input, std::function<Y()> fn);
- template <class T>
- FuturePtr<std::vector<T>> whenAll(std::vector<FuturePtr<T>> all);
- template <class T>
- FuturePtr<T> whenFirst(std::vector<FuturePtr<T>> all);
- template <class T>
- FuturePtr<std::vector<T>> whenAllBeforeDeadline(std::vector<FuturePtr<T>> all,
- std::chrono::system_clock::time_point deadline);
- private:
- void doWorkerLoop();
- void processTimeTriggeredTasks();
- void addActiveTask(std::shared_ptr<Task> task);
- void removeActiveTask(std::shared_ptr<Task> task);
- void addTimeTriggeredTask(std::shared_ptr<Task> task);
- void removeTimeTriggeredTask(std::shared_ptr<Task> task);
- void addReadyToRunTask(std::shared_ptr<Task> task);
- private:
- struct TimeTriggeredTaskCmp {
- bool operator()(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>& rhs) const;
- };
- std::vector<std::thread> workers_;
- std::mutex mutex_;
- std::atomic<bool> shutdown_flag_;
- std::condition_variable on_has_ready_tasks_;
- std::unordered_set<std::shared_ptr<Task>> active_tasks_;
- std::queue<std::shared_ptr<Task>> ready_to_run_tasks_;
- std::set<std::shared_ptr<Task>, TimeTriggeredTaskCmp> time_triggered_tasks_;
- friend class Task;
- template <class T>
- friend class Future;
- };
- std::shared_ptr<Executor> MakeThreadPoolExecutor(int num_threads);
- template <class T>
- class Future : public Task {
- public:
- explicit Future(const std::function<T()>& fn) : function_(fn) {
- if (!fn) {
- throw std::runtime_error("The function must not be null");
- }
- }
- void run() {
- result_ = function_();
- }
- T get() {
- wait();
- if (isCanceled()) {
- throw std::runtime_error("The task was canceled");
- }
- if (isFailed()) {
- std::rethrow_exception(getError());
- }
- return result_;
- }
- private:
- std::function<T()> function_;
- T result_;
- };
- template <class T>
- FuturePtr<T> Executor::invoke(std::function<T()> fn) {
- auto future = std::make_shared<Future<T>>(fn);
- submit(future);
- return future;
- }
- template <class Y, class T>
- FuturePtr<Y> Executor::then(FuturePtr<T> input, std::function<Y()> fn) {
- auto future = std::make_shared<Future<Y>>(fn);
- future->addTrigger(input);
- submit(future);
- return future;
- }
- template <class T>
- FuturePtr<std::vector<T>> Executor::whenAll(std::vector<FuturePtr<T>> all) {
- auto future = std::make_shared<Future<std::vector<T>>>([&all] {
- std::vector<T> results;
- results.reserve(all.size());
- for (auto& dep : all) {
- results.push_back(dep->get());
- }
- return results;
- });
- for (auto& dep : all) {
- future->addDependency(dep);
- }
- submit(future);
- return future;
- }
- template <class T>
- FuturePtr<T> Executor::whenFirst(std::vector<FuturePtr<T>> all) {
- auto future = std::make_shared<Future<T>>([&all] {
- for (auto& dep : all) {
- if (dep->isFinished()) {
- return dep->get();
- }
- }
- throw std::runtime_error("Internal error");
- });
- for (auto& dep : all) {
- future->addTrigger(dep);
- }
- submit(future);
- return future;
- }
- template <class T>
- FuturePtr<std::vector<T>> Executor::whenAllBeforeDeadline(
- std::vector<FuturePtr<T>> all, std::chrono::system_clock::time_point deadline) {
- auto future = std::make_shared<Future<std::vector<T>>>([&all] {
- std::vector<T> results;
- for (auto& dep : all) {
- if (dep->isFinished()) {
- results.push_back(dep->get());
- }
- }
- return results;
- });
- future->setTimeTrigger(deadline);
- submit(future);
- return future;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement