Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class WorkTask
- {
- public:
- void SetEventId(uint32_t id)
- {
- eventId_ = id;
- }
- uint32_t GetEventId() const
- {
- return eventId_;
- }
- void SetLastExecution(int64_t time)
- {
- lastExecution_ = time;
- }
- int64_t GetLastExecution() const
- {
- return lastExecution_;
- }
- int64_t GetPeriod() const
- {
- return period_;
- }
- bool IsReutilizable() const
- {
- return reutilize_;
- }
- std::function<void(void)> GetFunc()
- {
- return func_;
- }
- private:
- WorkTask(uint32_t period, std::function<void(void)>&& f, bool reutilize) :
- period_(period), reutilize_(reutilize), func_(f) {}
- bool reutilize_;
- int64_t period_;
- int64_t lastExecution_ = 0;
- uint32_t eventId_ = 0;
- std::function<void(void)> func_;
- friend WorkTask* createWorkerTask(uint32_t, std::function<void(void)>, bool);
- };
- WorkTask* createWorkerTask(uint32_t delay, std::function<void(void)> f, bool reutilize = true);
- bool periodSort(const WorkTask* lhs, const WorkTask* rhs);
- class Worker : public ThreadHolder<Worker>
- {
- public:
- Worker(Dispatcher& dispatcher)
- : dispatcher_(dispatcher)
- {
- }
- uint32_t AddWork(WorkTask* task);
- bool StopWork(uint32_t eventId);
- void Shutdown();
- void ThreadMain();
- private:
- std::thread thread_;
- std::mutex eventLock_;
- std::condition_variable eventSignal_;
- Dispatcher& dispatcher_;
- uint32_t lastEventId_{ 0 };
- std::list<WorkTask*> workList_;
- std::unordered_set<uint32_t> eventIds_;
- };
- WorkTask* createWorkerTask(uint32_t delay, std::function<void(void)> f, bool reutilize)
- {
- return new WorkTask(delay, std::move(f), reutilize);
- }
- bool periodSort(const WorkTask* lhs, const WorkTask* rhs)
- {
- return lhs->GetPeriod() < rhs->GetPeriod();
- }
- uint32_t Worker::AddWork(WorkTask* task)
- {
- eventLock_.lock();
- if (getState() != THREAD_STATE_RUNNING)
- {
- eventLock_.unlock();
- delete task;
- return 0;
- }
- if (task->GetEventId() == 0)
- {
- if (++lastEventId_ == 0)
- {
- lastEventId_ = 1;
- }
- task->SetEventId(lastEventId_);
- }
- // insert the event id in the list of active events
- uint32_t eventId = task->GetEventId();
- eventIds_.insert(eventId);
- bool doSignal = workList_.empty();
- // Set to the work list
- workList_.push_back(task);
- workList_.sort(&periodSort);
- eventLock_.unlock();
- if (doSignal)
- {
- eventSignal_.notify_one();
- }
- return eventId;
- }
- bool Worker::StopWork(uint32_t eventId)
- {
- if (eventId == 0)
- {
- return false;
- }
- std::lock_guard<std::mutex> lockClass(eventLock_);
- for (auto it = workList_.begin(); it != workList_.end(); it++)
- {
- if ((*it)->GetEventId() == eventId)
- {
- workList_.erase(it);
- return true;
- }
- }
- return false;
- }
- void Worker::Shutdown()
- {
- setState(THREAD_STATE_TERMINATED);
- eventLock_.lock();
- for (auto it = workList_.begin(); it != workList_.end(); it++)
- {
- workList_.erase(it);
- delete* it; // vai dar merda
- }
- eventIds_.clear();
- eventLock_.unlock();
- eventSignal_.notify_one();
- }
- void Worker::ThreadMain()
- {
- std::unique_lock<std::mutex> eventLockUnique(eventLock_, std::defer_lock);
- while (getState() != THREAD_STATE_TERMINATED)
- {
- eventLockUnique.lock();
- if (workList_.empty())
- {
- eventSignal_.wait(eventLockUnique);
- }
- WorkTask* workToRun = nullptr;
- for (auto it = workList_.begin(); it != workList_.end(); it++)
- {
- int64_t now = W2SYS_TIME();
- if (!(*it)->IsReutilizable())
- {
- workToRun = *it;
- workList_.erase(it);
- break;
- }
- if((now - (*it)->GetLastExecution()) >= (*it)->GetPeriod())
- {
- workToRun = *it;
- (*it)->SetLastExecution(now);
- break;
- }
- eventSignal_.wait_until(eventLockUnique,
- std::chrono::system_clock::time_point(std::chrono::system_clock::now()
- + std::chrono::milliseconds((*it)->GetPeriod())));
- }
- if (workToRun != nullptr)
- {
- dispatcher_.AddTask(createTask(workToRun->GetFunc()));
- }
- eventLockUnique.unlock();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement