Advertisement
saviofigueiredo

timeScheduler

Nov 19th, 2019
118
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.17 KB | None | 0 0
  1. #include <iostream>
  2. #include <chrono>
  3. #include <queue>
  4. #include <vector>
  5. #include <thread>
  6. #include <condition_variable>
  7. #include <mutex>
  8.  
  9. using namespace std;
  10. using namespace chrono;
  11. using namespace this_thread;
  12.  
  13. class Task {
  14. public:
  15.     virtual void run() = 0;
  16.     virtual ~Task() { }
  17. };
  18.  
  19. class PendingTask {
  20. public:
  21.     time_point<steady_clock> startTime;
  22.     Task *task;
  23.  
  24.     PendingTask(time_point<steady_clock> startTime, Task *task): startTime(startTime), task(task) { }
  25. };
  26.  
  27. struct Comparator {
  28.     bool operator()(const PendingTask &a, const PendingTask &b) const {
  29.         return a.startTime > b.startTime;
  30.     }
  31. };
  32.  
  33. class Scheduler {
  34. private:
  35.     const int ThreadPoolSize = 40;
  36.  
  37.     vector<thread> threadPool;
  38.     queue<Task *> tasksReadyToRunQueue;
  39.     priority_queue<PendingTask, vector<PendingTask>, Comparator> allTasksQueue;
  40.  
  41.     thread timerThread;
  42.     mutex timerMutex, workerMutex;
  43.     condition_variable timerCV, workerCV;
  44.  
  45.     bool workerIsRunning = true;
  46.     bool timerIsRunning = true;
  47.  
  48. public:
  49.     Scheduler() {
  50.         for (int i = 0; i < ThreadPoolSize; ++i) {
  51.             threadPool.push_back(thread(&Scheduler::workerThreadFunction, this));
  52.         }
  53.  
  54.         timerThread = thread(&Scheduler::timerThreadFunction, this);
  55.     }
  56.  
  57.     ~Scheduler() {
  58.         {
  59.             lock_guard<mutex> timerLock{ timerMutex };
  60.             timerIsRunning = false;
  61.             timerCV.notify_one();
  62.         }
  63.    
  64.         timerThread.join();
  65.  
  66.         {
  67.             lock_guard<mutex> workerLock{ workerMutex };
  68.             workerIsRunning = false;
  69.             workerCV.notify_all();
  70.         }
  71.    
  72.         for (auto &thread : threadPool) {
  73.             thread.join();
  74.         }
  75.     }
  76.  
  77.     void add(Task *task, double delayToRun) {
  78.         auto now = steady_clock::now();
  79.         milliseconds duration ((long long)delayToRun * 1000);
  80.  
  81.         time_point<steady_clock> startTime = now + duration;
  82.  
  83.         if (now >= startTime) {
  84.             lock_guard<mutex> lock { workerMutex };
  85.             tasksReadyToRunQueue.push(task);
  86.             workerCV.notify_one();
  87.         } else {
  88.             lock_guard<mutex> lock { timerMutex };
  89.             allTasksQueue.push(PendingTask(startTime, task));
  90.             timerCV.notify_one();
  91.         }
  92.     }
  93.  
  94. private:
  95.     void workerThreadFunction() {
  96.         while (true) {
  97.             unique_lock<mutex> lock { workerMutex };
  98.             workerCV.wait(lock, [this] { return !tasksReadyToRunQueue.empty() || !workerIsRunning; });
  99.  
  100.             if (!workerIsRunning) {
  101.                 break;
  102.             }
  103.  
  104.             Task *task = tasksReadyToRunQueue.front();
  105.             tasksReadyToRunQueue.pop();
  106.  
  107.             lock.unlock();
  108.  
  109.             task->run();
  110.             delete task;
  111.         }
  112.     }
  113.  
  114.     void timerThreadFunction() {
  115.         while(true) {
  116.             unique_lock<mutex> timerLock { timerMutex };
  117.             timerCV.wait_for(timerLock, nanoseconds(500000000));
  118.  
  119.             if (!timerIsRunning) {
  120.                 break;
  121.             }
  122.  
  123.             if (allTasksQueue.size() != 0) {
  124.                 auto head = allTasksQueue.top();
  125.  
  126.                 if (steady_clock::now() >= head.startTime) {
  127.                     unique_lock<mutex> workerLock{ workerMutex };
  128.  
  129.                     auto *task = head.task;
  130.                     tasksReadyToRunQueue.push(task);
  131.                     workerCV.notify_one();
  132.                     allTasksQueue.pop();
  133.                 }
  134.             }
  135.         }
  136.     }
  137. };
  138.  
  139. class DemoTask : public Task {
  140.     int n;
  141. public:
  142.     DemoTask(int n=0) : n{n} { }
  143.     void run() override {
  144.         cout << "Start task " << n << endl;;
  145.         sleep_for(seconds(2));
  146.         cout << " Stop task " << n << endl;;
  147.     }
  148. };
  149.  
  150. int main()
  151. {
  152.     Scheduler sched;
  153.  
  154.     Task *t0 = new DemoTask{0};
  155.     Task *t1 = new DemoTask{1};
  156.     Task *t2 = new DemoTask{2};
  157.     Task *t3 = new DemoTask{3};
  158.     Task *t4 = new DemoTask{4};
  159.     Task *t5 = new DemoTask{5};
  160.  
  161.     sched.add(t0, 7.313);
  162.     sched.add(t1, 2.213);
  163.     sched.add(t2, 0.713);
  164.     sched.add(t3, 1.243);
  165.     sched.add(t4, 0.913);
  166.     sched.add(t5, 3.313);
  167.  
  168.     sleep_for(seconds(10));
  169. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement