Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <chrono>
- #include <queue>
- #include <vector>
- #include <thread>
- #include <condition_variable>
- #include <mutex>
- using namespace std;
- using namespace chrono;
- using namespace this_thread;
- class Task {
- public:
- virtual void run() = 0;
- virtual ~Task() { }
- };
- class PendingTask {
- public:
- time_point<steady_clock> startTime;
- Task *task;
- PendingTask(time_point<steady_clock> startTime, Task *task): startTime(startTime), task(task) { }
- };
- struct Comparator {
- bool operator()(const PendingTask &a, const PendingTask &b) const {
- return a.startTime > b.startTime;
- }
- };
- class Scheduler {
- private:
- const int ThreadPoolSize = 40;
- vector<thread> threadPool;
- queue<Task *> tasksReadyToRunQueue;
- priority_queue<PendingTask, vector<PendingTask>, Comparator> allTasksQueue;
- thread timerThread;
- mutex timerMutex, workerMutex;
- condition_variable timerCV, workerCV;
- bool workerIsRunning = true;
- bool timerIsRunning = true;
- public:
- Scheduler() {
- for (int i = 0; i < ThreadPoolSize; ++i) {
- threadPool.push_back(thread(&Scheduler::workerThreadFunction, this));
- }
- timerThread = thread(&Scheduler::timerThreadFunction, this);
- }
- ~Scheduler() {
- {
- lock_guard<mutex> timerLock{ timerMutex };
- timerIsRunning = false;
- timerCV.notify_one();
- }
- timerThread.join();
- {
- lock_guard<mutex> workerLock{ workerMutex };
- workerIsRunning = false;
- workerCV.notify_all();
- }
- for (auto &thread : threadPool) {
- thread.join();
- }
- }
- void add(Task *task, double delayToRun) {
- auto now = steady_clock::now();
- milliseconds duration ((long long)delayToRun * 1000);
- time_point<steady_clock> startTime = now + duration;
- if (now >= startTime) {
- lock_guard<mutex> lock { workerMutex };
- tasksReadyToRunQueue.push(task);
- workerCV.notify_one();
- } else {
- lock_guard<mutex> lock { timerMutex };
- allTasksQueue.push(PendingTask(startTime, task));
- timerCV.notify_one();
- }
- }
- private:
- void workerThreadFunction() {
- while (true) {
- unique_lock<mutex> lock { workerMutex };
- workerCV.wait(lock, [this] { return !tasksReadyToRunQueue.empty() || !workerIsRunning; });
- if (!workerIsRunning) {
- break;
- }
- Task *task = tasksReadyToRunQueue.front();
- tasksReadyToRunQueue.pop();
- lock.unlock();
- task->run();
- delete task;
- }
- }
- void timerThreadFunction() {
- while(true) {
- unique_lock<mutex> timerLock { timerMutex };
- timerCV.wait_for(timerLock, nanoseconds(500000000));
- if (!timerIsRunning) {
- break;
- }
- if (allTasksQueue.size() != 0) {
- auto head = allTasksQueue.top();
- if (steady_clock::now() >= head.startTime) {
- unique_lock<mutex> workerLock{ workerMutex };
- auto *task = head.task;
- tasksReadyToRunQueue.push(task);
- workerCV.notify_one();
- allTasksQueue.pop();
- }
- }
- }
- }
- };
- class DemoTask : public Task {
- int n;
- public:
- DemoTask(int n=0) : n{n} { }
- void run() override {
- cout << "Start task " << n << endl;;
- sleep_for(seconds(2));
- cout << " Stop task " << n << endl;;
- }
- };
- int main()
- {
- Scheduler sched;
- Task *t0 = new DemoTask{0};
- Task *t1 = new DemoTask{1};
- Task *t2 = new DemoTask{2};
- Task *t3 = new DemoTask{3};
- Task *t4 = new DemoTask{4};
- Task *t5 = new DemoTask{5};
- sched.add(t0, 7.313);
- sched.add(t1, 2.213);
- sched.add(t2, 0.713);
- sched.add(t3, 1.243);
- sched.add(t4, 0.913);
- sched.add(t5, 3.313);
- sleep_for(seconds(10));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement