Advertisement
Guest User

Ctpl's thread pool deadlock 1

a guest
Dec 12th, 2017
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.17 KB | None | 0 0
  1. /**
  2. copy from
  3. https://softwarerecs.stackexchange.com/questions/11009/c11-thread-pool-implementation
  4. -> https://github.com/progschj/ThreadPool
  5. */
  6. #include <vector>
  7. #include <queue>
  8. #include <memory>
  9. #include <thread>
  10. #include <mutex>
  11. #include <condition_variable>
  12. #include <future>
  13. #include <functional>
  14. #include <stdexcept>
  15. #include <iostream>
  16.  
  17. namespace ctpl {
  18.  
  19.     namespace detail {
  20.         template <typename T>
  21.         class Queue {
  22.         public:
  23.             bool push(T const & value) {
  24.                 std::unique_lock<std::mutex> lock(this->mutex);
  25.                 this->q.push(value);
  26.                 return true;
  27.             }
  28.             // deletes the retrieved element, do not use for non integral types
  29.             bool pop(T & v) {
  30.                 std::unique_lock<std::mutex> lock(this->mutex);
  31.                 if (this->q.empty())
  32.                     return false;
  33.                 v = this->q.front();
  34.                 this->q.pop();
  35.                 return true;
  36.             }
  37.             bool empty() {
  38.                 std::unique_lock<std::mutex> lock(this->mutex);
  39.                 return this->q.empty();
  40.             }
  41.         private:
  42.             std::queue<T> q;
  43.             std::mutex mutex;
  44.         };
  45.     }
  46.  
  47.     class thread_pool {
  48.  
  49.     public:
  50.  
  51.         thread_pool() { this->init(); }
  52.         thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
  53.  
  54.         // the destructor waits for all the functions in the queue to be finished
  55.         ~thread_pool() {
  56.             this->stop(true);
  57.         }
  58.  
  59.         // get the number of running threads in the pool
  60.         int size() { return static_cast<int>(this->threads.size()); }
  61.  
  62.         // number of idle threads
  63.         int n_idle() { return this->nWaiting; }
  64.         std::thread & get_thread(int i) { return *this->threads[i]; }
  65.  
  66.         // change the number of threads in the pool
  67.         // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
  68.         // nThreads must be >= 0
  69.         void resize(int nThreads) {
  70.             if (!this->isStop && !this->isDone) {
  71.                 int oldNThreads = static_cast<int>(this->threads.size());
  72.                 if (oldNThreads <= nThreads) {  // if the number of threads is increased
  73.                     this->threads.resize(nThreads);
  74.                     this->flags.resize(nThreads);
  75.  
  76.                     for (int i = oldNThreads; i < nThreads; ++i) {
  77.                         this->flags[i] = std::make_shared<std::atomic<bool>>(false);
  78.                         this->set_thread(i);
  79.                     }
  80.                 }
  81.                 else {  // the number of threads is decreased
  82.                     for (int i = oldNThreads - 1; i >= nThreads; --i) {
  83.                         *this->flags[i] = true;  // this thread will finish
  84.                         this->threads[i]->detach();
  85.                     }
  86.                     {
  87.                         // stop the detached threads that were waiting
  88.                         std::unique_lock<std::mutex> lock(this->mutex);
  89.                         this->cv.notify_all();
  90.                     }
  91.                     this->threads.resize(nThreads);  // safe to delete because the threads are detached
  92.                     this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
  93.                 }
  94.             }
  95.         }
  96.  
  97.         // empty the queue
  98.         void clear_queue() {
  99.             std::function<void(int id)> * _f;
  100.             while (this->q.pop(_f))
  101.                 delete _f; // empty the queue
  102.         }
  103.  
  104.         // pops a functional wrapper to the original function
  105.         std::function<void(int)> pop() {
  106.             std::function<void(int id)> * _f = nullptr;
  107.             this->q.pop(_f);
  108.             std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
  109.             std::function<void(int)> f;
  110.             if (_f)
  111.                 f = *_f;
  112.             return f;
  113.         }
  114.  
  115.         // wait for all computing threads to finish and stop all threads
  116.         // may be called asynchronously to not pause the calling thread while waiting
  117.         // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
  118.         void stop(bool isWait = false) {
  119.             if (!isWait) {
  120.                 if (this->isStop)
  121.                     return;
  122.                 this->isStop = true;
  123.                 for (int i = 0, n = this->size(); i < n; ++i) {
  124.                     *this->flags[i] = true;  // command the threads to stop
  125.                 }
  126.                 this->clear_queue();  // empty the queue
  127.             }
  128.             else {
  129.                 if (this->isDone || this->isStop)
  130.                     return;
  131.                 this->isDone = true;  // give the waiting threads a command to finish
  132.             }
  133.             {
  134.                 std::unique_lock<std::mutex> lock(this->mutex);
  135.                 this->cv.notify_all();  // stop all waiting threads
  136.             }
  137.             for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
  138.                     if (this->threads[i]->joinable())
  139.                         this->threads[i]->join();
  140.             }
  141.             // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
  142.             // therefore delete them here
  143.             this->clear_queue();
  144.             this->threads.clear();
  145.             this->flags.clear();
  146.         }
  147.  
  148.         template<typename F, typename... Rest>
  149.         auto push(F && f, Rest&&... rest) ->std::future<decltype(f( 0,rest...))> {  //note: me edit
  150.             auto pck = std::make_shared<std::packaged_task<decltype(f( 0,rest...))(int)>>( //note: me edit
  151.                 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
  152.                 );
  153.             auto _f = new std::function<void(int id)>([pck](int id) {
  154.                 (*pck)(id);
  155.             });
  156.             this->q.push(_f);
  157.             std::unique_lock<std::mutex> lock(this->mutex);
  158.             this->cv.notify_one();
  159.             return pck->get_future();
  160.         }
  161.          
  162.         template<typename F, typename... Rest>
  163.         auto add_immediate(F && f, Rest&&... rest) ->std::future<decltype(f( rest...))> {  //note: me edit
  164. //            return push([&](int zerooo,auto restLocal...){
  165. //              return f(restLocal...);
  166. //          },rest...);
  167. //          return push([&](int zerooo){
  168. //              return f(rest...);
  169. //          });
  170.             auto pck = std::make_shared<std::packaged_task<decltype(f( rest...))(int)>>( //note: me edit
  171.                 std::bind(std::forward<F>(f), std::forward<Rest>(rest)...)
  172.                 );
  173.             auto _f = new std::function<void(int id)>([pck](int id) {
  174.                 (*pck)(id);
  175.             });
  176.            
  177.             this->q.push(_f);
  178.             std::unique_lock<std::mutex> lock(this->mutex);
  179.             this->cv.notify_one();
  180.             return pck->get_future();
  181.         }
  182.        
  183.         template<typename F>
  184.         auto add_immediate(F && f) ->std::future<decltype(f())> {  //note: me edit
  185.             return push([&](int zerooo){
  186.                 return f();
  187.             });
  188.         }
  189.  
  190.  
  191.         // run the user's function that excepts argument int - id of the running thread. returned value is templatized
  192.         // operator returns std::future, where the user can get the result and rethrow the catched exceptins
  193.         template<typename F>
  194.         auto push(F && f) ->std::future<decltype(f(0))> { //note: me edit
  195.             auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f)); //note: me edit
  196.             auto _f = new std::function<void(int id)>([pck](int id) {
  197.                 (*pck)(id);
  198.             });
  199.             this->q.push(_f);
  200.             std::unique_lock<std::mutex> lock(this->mutex);
  201.             this->cv.notify_one();
  202.             return pck->get_future();
  203.         }
  204.  
  205.  
  206.     private:
  207.  
  208.         // deleted
  209.         thread_pool(const thread_pool &);// = delete;
  210.         thread_pool(thread_pool &&);// = delete;
  211.         thread_pool & operator=(const thread_pool &);// = delete;
  212.         thread_pool & operator=(thread_pool &&);// = delete;
  213.  
  214.         void set_thread(int i) {
  215.             std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
  216.             auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
  217.                 std::atomic<bool> & _flag = *flag;
  218.                 std::function<void(int id)> * _f;
  219.                 bool isPop = this->q.pop(_f);
  220.                 while (true) {
  221.                     while (isPop) {  // if there is anything in the queue
  222.                         std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
  223.                         (*_f)(i);
  224.                         if (_flag)
  225.                             return;  // the thread is wanted to stop, return even if the queue is not empty yet
  226.                         else
  227.                             isPop = this->q.pop(_f);
  228.                     }
  229.                     // the queue is empty here, wait for the next command
  230.                     std::unique_lock<std::mutex> lock(this->mutex);
  231.                     ++this->nWaiting;
  232.                     this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
  233.                     --this->nWaiting;
  234.                     if (!isPop)
  235.                         return;  // if the queue is empty and this->isDone == true or *flag then return
  236.                 }
  237.             };
  238.             this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
  239.         }
  240.  
  241.         void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
  242.  
  243.         std::vector<std::unique_ptr<std::thread>> threads;
  244.         std::vector<std::shared_ptr<std::atomic<bool>>> flags;
  245.         detail::Queue<std::function<void(int id)> *> q;
  246.         std::atomic<bool> isDone;
  247.         std::atomic<bool> isStop;
  248.         std::atomic<int> nWaiting;  // how many threads are waiting
  249.  
  250.         std::mutex mutex;
  251.         std::condition_variable cv;
  252.     };
  253.  
  254. }
  255. //-------------user here
  256. ctpl::thread_pool pool(4);
  257. class C{
  258.     public: int d=35;
  259. };
  260. class B{
  261.     public: C* c;
  262.     public: void test(){
  263.         std::vector<std::future<void>> cac;
  264.         for(int n=0;n<5;n++){
  265.             cac.push_back(
  266.                 pool.push([&](int threadId){  
  267.                     test2();
  268.                 })
  269.             );
  270.         }
  271.         for(auto& ele : cac){
  272.             ele.get();
  273.         }
  274.     };
  275.     public: void test2(){
  276.         std::vector<std::future<void>> cac;
  277.         for(int n=0;n<5;n++){
  278.             cac.push_back(
  279.                 pool.push([&](int threadId){  
  280.                     int accu=0;
  281.                     for(int i=0;i<10000;i++){
  282.                         accu+=i;
  283.                     }
  284.                     std::cout<<accu<<" access c="<<c->d<<std::endl;
  285.                 })
  286.             );
  287.         }
  288.         for(auto& ele : cac){
  289.             ele.get();
  290.         }
  291.     }
  292. };
  293. int main(){
  294.     C c;
  295.     B b; b.c=&c;
  296.     b.test();
  297.     std::cout<<"end"<<std::endl;
  298. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement