Advertisement
YoFreakinLo

thread_pool

Jul 28th, 2020
2,278
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.83 KB | None | 0 0
  1. ------------------------thread_pool.h----------------------------
  2. #ifndef _THREAD_POOL_H_
  3. #define _THREAD_POOL_H_
  4.  
  5. #include <functional>
  6. #include <future>
  7. #include <mutex>
  8. #include <queue>
  9. #include <thread>
  10.  
  11. namespace standby_network
  12. {
  13.  
  14. class ThreadPool
  15. {
  16. public:
  17.     ThreadPool(uint8_t thread_count = 0);
  18.     ThreadPool(const ThreadPool &) = delete;
  19.     ThreadPool(ThreadPool &&move);
  20.     ~ThreadPool();
  21.  
  22. public:
  23.     auto operator=(const ThreadPool &) -> const ThreadPool & = delete;
  24.     auto operator=(ThreadPool &&move) -> const ThreadPool &;
  25.  
  26. public:
  27.     template<typename Func, typename ...Args>
  28.     auto enqueue(const Func &func, Args &&... args) -> void
  29.     {
  30.         auto f = std::bind(func, std::forward(args)...);
  31.         tasks_mut.lock();
  32.         tasks.push([&f]()
  33.         {
  34.             f();
  35.         });
  36.         tasks_mut.unlock();
  37.     }
  38.  
  39.     template<typename Func, typename ...Args>
  40.     auto enqueue_task(const Func &func, Args &&... args) -> std::future<typename std::result_of<Func(Args...)>::type>
  41.     {
  42.         auto f = std::bind(func, std::forward<Args>(args)...);
  43.         using RetType = typename std::result_of<Func(Args...)>::type;
  44.         std::promise<RetType> p;
  45.         std::future ret_val = p.get_future();
  46.         auto helper = [&f](std::promise<RetType> &&p)
  47.         {
  48.             return [&f, &p]()
  49.             {
  50.                 p.set_value(f());
  51.             };
  52.         };
  53.         tasks_mut.lock();
  54.         tasks.push(helper(std::move(p)));
  55.         tasks_mut.unlock();
  56.        
  57.         return std::move(ret_val);
  58.     }
  59.  
  60. private:
  61.     std::mutex tasks_mut;
  62.     std::queue<std::function<void()>> tasks;
  63.     std::vector<std::thread> workers;
  64.     bool run;
  65. };
  66.  
  67. } // namespace standby_network
  68.  
  69. #endif // _THREAD_POOL_H_
  70.  
  71. ------------------------thread_pool.cc------------------------
  72. #include "thread_pool.h"
  73.  
  74. namespace standby_network
  75. {
  76.  
  77. ThreadPool::ThreadPool(uint8_t thread_count) :
  78.     run(true)
  79. {
  80.     auto worker_count = (thread_count) ? thread_count : std::thread::hardware_concurrency();
  81.     for(int i = 0; i < worker_count; i++)
  82.     {
  83.         workers.push_back(std::thread([this]() -> void
  84.         {
  85.             while(run)
  86.             {
  87.                 tasks_mut.lock();
  88.                 if(!tasks.empty())
  89.                 {
  90.                     auto t = tasks.front();
  91.                     tasks.pop();
  92.                     tasks_mut.unlock();
  93.                     t();
  94.                 }
  95.                 else
  96.                 {
  97.                     tasks_mut.unlock();
  98.                 }
  99.             }
  100.         }));
  101.     }
  102. }
  103.  
  104. ThreadPool::ThreadPool(ThreadPool &&move) :
  105.     ThreadPool()
  106. {
  107.     std::lock_guard this_guard(tasks_mut), that_guard(move.tasks_mut);
  108.     tasks = std::move(move.tasks);
  109. }
  110.  
  111. ThreadPool::~ThreadPool()
  112. {
  113.     run = false;
  114.     for(auto &w : workers)
  115.     {
  116.         if(w.joinable())
  117.         {
  118.             w.join();
  119.         }
  120.     }
  121. }
  122.  
  123. auto ThreadPool::operator=(ThreadPool &&move) -> const ThreadPool &
  124. {
  125.     run = move.run;
  126.     std::lock_guard this_guard(tasks_mut), that_guard(move.tasks_mut);
  127.     std::swap(tasks, move.tasks);
  128.  
  129.     return *this;
  130. }
  131.  
  132. } // namespace standby_network
  133. ---------------------------------test.cc----------------------------
  134. #include <iostream>
  135.  
  136. #include <thread_pool.h>
  137.  
  138. auto main() -> int
  139. {
  140.     using namespace standby_network;
  141.     ThreadPool tp;
  142.     auto task = [](int a, int b) -> int
  143.     {
  144.         return a + b;
  145.     };
  146.  
  147.     int task_count = 2;
  148.     std::vector<std::future<int>> futures;
  149.     while(task_count)
  150.     {
  151.         futures.push_back(tp.enqueue_task(task, task_count, task_count + 1));
  152.         task_count--;
  153.     }
  154.  
  155.     for(auto &f : futures)
  156.     {
  157.         if(f.valid())
  158.         {
  159.             f.wait();
  160.             std::cout << f.get() << std::endl;
  161.         }
  162.     }
  163.  
  164.     return 0;
  165. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement