Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include<thread>
- #include<queue>
- #include<vector>
- #include<functional>
- #include<mutex>
- #include<Windows.h>
- template<class P, class C, class R> class ProducerConsumer {
- private:
- std::condition_variable new_product_cond;
- std::condition_variable empty_space_cond;
- std::mutex queue_modify_m;
- std::function<R(C)>* consume;
- std::function<C(P)>* produce;
- std::int32_t num_p;
- std::int32_t num_c;
- std::int32_t buf_size;
- std::queue<C> produced;
- std::vector<P>* p_params;
- std::vector<R> results;
- std::thread* cThreads;
- std::thread* pThreads;
- void add_product(C task) {
- std::unique_lock<std::mutex> lock(queue_modify_m);
- while (produced.size() >= buf_size)
- empty_space_cond.wait(lock);
- produced.push(task);
- new_product_cond.notify_one();
- }
- C take_product() {
- std::unique_lock<std::mutex> lock(queue_modify_m);
- while (produced.empty())
- new_product_cond.wait(lock);
- C task = produced.front();
- produced.pop();
- empty_space_cond.notify_one();
- return task;
- }
- void producer(std::int32_t start, std::int32_t end) {
- for (int i = start; i < end; i++)
- add_product((*produce)((*p_params)[i]));
- }
- void consumer() {
- while (true) {
- results.push_back((*consume)(take_product()));
- }
- }
- public:
- ProducerConsumer(std::function<C (P)>* p, std::function<R (C)>* c,
- std::int32_t num_p, std::int32_t num_c, std::int32_t buf_size,
- std::vector<P>* p_params):
- num_p(num_p), num_c(num_c), buf_size(buf_size)
- {
- cThreads = new std::thread[num_c];
- pThreads = new std::thread[num_p];
- consume = c;
- produce = p;
- this->p_params = p_params;
- }
- ~ProducerConsumer() {
- delete[] cThreads;
- delete[] pThreads;
- delete consume;
- delete produce;
- delete p_params;
- }
- void start() {
- std::int32_t task_range = p_params->size() / num_p;
- for (int i = 0; i < num_p - 1; i++)
- pThreads[i] = std::thread(ProducerConsumer::producer, task_range*i, task_range*i + task_range);
- pThreads[num_p - 1] = std::thread(ProducerConsumer::producer, (num_p - 1)*task_range, num_p);
- for (int i = 0; i < num_c; i++)
- cThreads[i] = std::thread(ProducerConsumer::consumer);
- for (int i = 0; i < buf_size; i++)
- empty_space_cond.notify_one();
- }
- std::vector<R> get_results() {
- Sleep(2000);
- return results;
- }
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement