Advertisement
Guest User

Progschj's ThreadPool deadlock 1

a guest
Dec 12th, 2017
195
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.60 KB | None | 0 0
  1. /**
  2. copy from
  3. https://softwarerecs.stackexchange.com/questions/11009/c11-thread-pool-implementation
  4. -> https://github.com/progschj/ThreadPool
  5. */
  6. #include <vector>
  7. #include <queue>
  8. #include <memory>
  9. #include <thread>
  10. #include <mutex>
  11. #include <condition_variable>
  12. #include <future>
  13. #include <functional>
  14. #include <stdexcept>
  15. #include <iostream>
  16. class ThreadPool {
  17. public:
  18.     ThreadPool(size_t);
  19.     template<class F, class... Args>
  20.     auto enqueue(F&& f, Args&&... args)
  21.         -> std::future<typename std::result_of<F(Args...)>::type>;
  22.     ~ThreadPool();
  23. private:
  24.     // need to keep track of threads so we can join them
  25.     std::vector< std::thread > workers;
  26.     // the task queue
  27.     std::queue< std::function<void()> > tasks;
  28.    
  29.     // synchronization
  30.     std::mutex queue_mutex;
  31.     std::condition_variable condition;
  32.     bool stop;
  33. };
  34.  
  35. // the constructor just launches some amount of workers
  36. inline ThreadPool::ThreadPool(size_t threads)
  37.     :   stop(false)
  38. {
  39.     for(size_t i = 0;i<threads;++i)
  40.         workers.emplace_back(
  41.             [this]
  42.             {
  43.                 for(;;)
  44.                 {
  45.                     std::function<void()> task;
  46.  
  47.                     {
  48.                         std::unique_lock<std::mutex> lock(this->queue_mutex);
  49.                         this->condition.wait(lock,
  50.                             [this]{ return this->stop || !this->tasks.empty(); });
  51.                         if(this->stop && this->tasks.empty())
  52.                             return;
  53.                         task = std::move(this->tasks.front());
  54.                         this->tasks.pop();
  55.                     }
  56.  
  57.                     task();
  58.                 }
  59.             }
  60.         );
  61. }
  62.  
  63. // add new work item to the pool
  64. template<class F, class... Args>
  65. auto ThreadPool::enqueue(F&& f, Args&&... args)
  66.     -> std::future<typename std::result_of<F(Args...)>::type>
  67. {
  68.     using return_type = typename std::result_of<F(Args...)>::type;
  69.  
  70.     auto task = std::make_shared< std::packaged_task<return_type()> >(
  71.             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  72.         );
  73.        
  74.     std::future<return_type> res = task->get_future();
  75.     {
  76.         std::unique_lock<std::mutex> lock(queue_mutex);
  77.  
  78.         // don't allow enqueueing after stopping the pool
  79.         if(stop)
  80.             throw std::runtime_error("enqueue on stopped ThreadPool");
  81.  
  82.         tasks.emplace([task](){ (*task)(); });
  83.     }
  84.     condition.notify_one();
  85.     return res;
  86. }
  87.  
  88. // the destructor joins all threads
  89. inline ThreadPool::~ThreadPool()
  90. {
  91.     {
  92.         std::unique_lock<std::mutex> lock(queue_mutex);
  93.         stop = true;
  94.     }
  95.     condition.notify_all();
  96.     for(std::thread &worker: workers)
  97.         worker.join();
  98. }
  99. //-------------user here
  100. ThreadPool pool(4);
  101. class C{
  102.     public: int d=35;
  103. };
  104. class B{
  105.     public: C* c;
  106.     public: void test(){
  107.         std::vector<std::future<void>> cac;
  108.         for(int n=0;n<5;n++){
  109.             cac.push_back(
  110.                 pool.enqueue([&](){  
  111.                     test2();
  112.                 })
  113.             );
  114.         }
  115.         for(auto& ele : cac){
  116.             ele.get();
  117.         }
  118.     };
  119.     public: void test2(){
  120.         std::vector<std::future<void>> cac;
  121.         for(int n=0;n<5;n++){
  122.             cac.push_back(
  123.                 pool.enqueue([&](){  
  124.                     int accu=0;
  125.                     for(int i=0;i<10000;i++){
  126.                         accu+=i;
  127.                     }
  128.                     std::cout<<accu<<" access c="<<c->d<<std::endl;
  129.                 })
  130.             );
  131.         }
  132.         for(auto& ele : cac){
  133.             ele.get();
  134.         }
  135.     }
  136. };
  137. int main(){
  138.     C c;
  139.     B b; b.c=&c;
  140.     b.test();
  141.     std::cout<<"end"<<std::endl;
  142. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement