Advertisement
Guest User

Untitled

a guest
Dec 18th, 2017
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.29 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include<thread>
  4. #include<queue>
  5. #include<vector>
  6. #include<functional>
  7. #include<mutex>
  8. #include<Windows.h>
  9.  
  10. template<class P, class C, class R> class ProducerConsumer {
  11. private:
  12.     std::condition_variable new_product_cond;
  13.     std::condition_variable empty_space_cond;
  14.     std::mutex queue_modify_m;
  15.  
  16.     std::function<R(C)>* consume;
  17.     std::function<C(P)>* produce;
  18.     std::int32_t num_p;
  19.     std::int32_t num_c;
  20.     std::int32_t buf_size;
  21.     std::queue<C> produced;
  22.     std::vector<P>* p_params;
  23.     std::vector<R> results;
  24.  
  25.     std::thread* cThreads;
  26.     std::thread* pThreads;
  27.  
  28.     void add_product(C task) {
  29.         std::unique_lock<std::mutex> lock(queue_modify_m);
  30.         while (produced.size() >= buf_size)
  31.             empty_space_cond.wait(lock);
  32.         produced.push(task);
  33.         new_product_cond.notify_one();
  34.     }
  35.  
  36.     C take_product() {
  37.         std::unique_lock<std::mutex> lock(queue_modify_m);
  38.         while (produced.empty())
  39.             new_product_cond.wait(lock);
  40.         C task = produced.front();
  41.         produced.pop();
  42.         empty_space_cond.notify_one();
  43.         return task;
  44.     }
  45.  
  46.     void producer(std::int32_t start, std::int32_t end) {
  47.         for (int i = start; i < end; i++)
  48.             add_product((*produce)((*p_params)[i]));
  49.     }
  50.  
  51.     void consumer() {
  52.         while (true) {
  53.             results.push_back((*consume)(take_product()));
  54.         }
  55.     }
  56. public:
  57.     ProducerConsumer(std::function<C (P)>* p, std::function<R (C)>* c,
  58.         std::int32_t num_p, std::int32_t num_c, std::int32_t buf_size,
  59.         std::vector<P>* p_params):
  60.         num_p(num_p), num_c(num_c), buf_size(buf_size)
  61.     {
  62.         cThreads = new std::thread[num_c];
  63.         pThreads = new std::thread[num_p];
  64.         consume = c;
  65.         produce = p;
  66.         this->p_params = p_params;
  67.     }
  68.  
  69.     ~ProducerConsumer() {
  70.         delete[] cThreads;
  71.         delete[] pThreads;
  72.         delete consume;
  73.         delete produce;
  74.         delete p_params;
  75.     }
  76.  
  77.     void start() {
  78.         std::int32_t task_range = p_params->size() / num_p;
  79.         for (int i = 0; i < num_p - 1; i++)
  80.             pThreads[i] = std::thread(ProducerConsumer::producer, task_range*i, task_range*i + task_range);
  81.         pThreads[num_p - 1] = std::thread(ProducerConsumer::producer, (num_p - 1)*task_range, num_p);
  82.         for (int i = 0; i < num_c; i++)
  83.             cThreads[i] = std::thread(ProducerConsumer::consumer);
  84.         for (int i = 0; i < buf_size; i++)
  85.             empty_space_cond.notify_one();
  86.     }
  87.  
  88.     std::vector<R> get_results() {
  89.         Sleep(2000);
  90.         return results;
  91.     }
  92. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement