Advertisement
Guest User

ctrl thread pool dead lock

a guest
Dec 12th, 2017
201
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.02 KB | None | 0 0
  1.  
  2. #include <vector>
  3. #include <queue>
  4. #include <memory>
  5. #include <thread>
  6. #include <mutex>
  7. #include <condition_variable>
  8. #include <future>
  9. #include <functional>
  10. #include <stdexcept>
  11. #include <iostream>
  12.  
  13. namespace ctpl {
  14.  
  15.     namespace detail {
  16.         template <typename T>
  17.         class Queue {
  18.         public:
  19.             bool push(T const & value) {
  20.                 std::unique_lock<std::mutex> lock(this->mutex);
  21.                 this->q.push(value);
  22.                 return true;
  23.             }
  24.             // deletes the retrieved element, do not use for non integral types
  25.             bool pop(T & v) {
  26.                 std::unique_lock<std::mutex> lock(this->mutex);
  27.                 if (this->q.empty())
  28.                     return false;
  29.                 v = this->q.front();
  30.                 this->q.pop();
  31.                 return true;
  32.             }
  33.             bool empty() {
  34.                 std::unique_lock<std::mutex> lock(this->mutex);
  35.                 return this->q.empty();
  36.             }
  37.         private:
  38.             std::queue<T> q;
  39.             std::mutex mutex;
  40.         };
  41.     }
  42.  
  43.     class thread_pool {
  44.  
  45.     public:
  46.  
  47.         thread_pool() { this->init(); }
  48.         thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
  49.  
  50.         // the destructor waits for all the functions in the queue to be finished
  51.         ~thread_pool() {
  52.             this->stop(true);
  53.         }
  54.  
  55.         // get the number of running threads in the pool
  56.         int size() { return static_cast<int>(this->threads.size()); }
  57.  
  58.         // number of idle threads
  59.         int n_idle() { return this->nWaiting; }
  60.         std::thread & get_thread(int i) { return *this->threads[i]; }
  61.  
  62.         // change the number of threads in the pool
  63.         // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
  64.         // nThreads must be >= 0
  65.         void resize(int nThreads) {
  66.             if (!this->isStop && !this->isDone) {
  67.                 int oldNThreads = static_cast<int>(this->threads.size());
  68.                 if (oldNThreads <= nThreads) {  // if the number of threads is increased
  69.                     this->threads.resize(nThreads);
  70.                     this->flags.resize(nThreads);
  71.  
  72.                     for (int i = oldNThreads; i < nThreads; ++i) {
  73.                         this->flags[i] = std::make_shared<std::atomic<bool>>(false);
  74.                         this->set_thread(i);
  75.                     }
  76.                 }
  77.                 else {  // the number of threads is decreased
  78.                     for (int i = oldNThreads - 1; i >= nThreads; --i) {
  79.                         *this->flags[i] = true;  // this thread will finish
  80.                         this->threads[i]->detach();
  81.                     }
  82.                     {
  83.                         // stop the detached threads that were waiting
  84.                         std::unique_lock<std::mutex> lock(this->mutex);
  85.                         this->cv.notify_all();
  86.                     }
  87.                     this->threads.resize(nThreads);  // safe to delete because the threads are detached
  88.                     this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
  89.                 }
  90.             }
  91.         }
  92.  
  93.         // empty the queue
  94.         void clear_queue() {
  95.             std::function<void(int id)> * _f;
  96.             while (this->q.pop(_f))
  97.                 delete _f; // empty the queue
  98.         }
  99.  
  100.         // pops a functional wrapper to the original function
  101.         std::function<void(int)> pop() {
  102.             std::function<void(int id)> * _f = nullptr;
  103.             this->q.pop(_f);
  104.             std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
  105.             std::function<void(int)> f;
  106.             if (_f)
  107.                 f = *_f;
  108.             return f;
  109.         }
  110.  
  111.         // wait for all computing threads to finish and stop all threads
  112.         // may be called asynchronously to not pause the calling thread while waiting
  113.         // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
  114.         void stop(bool isWait = false) {
  115.             if (!isWait) {
  116.                 if (this->isStop)
  117.                     return;
  118.                 this->isStop = true;
  119.                 for (int i = 0, n = this->size(); i < n; ++i) {
  120.                     *this->flags[i] = true;  // command the threads to stop
  121.                 }
  122.                 this->clear_queue();  // empty the queue
  123.             }
  124.             else {
  125.                 if (this->isDone || this->isStop)
  126.                     return;
  127.                 this->isDone = true;  // give the waiting threads a command to finish
  128.             }
  129.             {
  130.                 std::unique_lock<std::mutex> lock(this->mutex);
  131.                 this->cv.notify_all();  // stop all waiting threads
  132.             }
  133.             for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
  134.                     if (this->threads[i]->joinable())
  135.                         this->threads[i]->join();
  136.             }
  137.             // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
  138.             // therefore delete them here
  139.             this->clear_queue();
  140.             this->threads.clear();
  141.             this->flags.clear();
  142.         }
  143.  
  144.         template<typename F, typename... Rest>
  145.         auto push(F && f, Rest&&... rest) ->std::future<decltype(f( 0,rest...))> {  //note: me edit
  146.             auto pck = std::make_shared<std::packaged_task<decltype(f( 0,rest...))(int)>>( //note: me edit
  147.                 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
  148.                 );
  149.             auto _f = new std::function<void(int id)>([pck](int id) {
  150.                 (*pck)(id);
  151.             });
  152.             this->q.push(_f);
  153.             std::unique_lock<std::mutex> lock(this->mutex);
  154.             this->cv.notify_one();
  155.             return pck->get_future();
  156.         }
  157.          
  158.         template<typename F, typename... Rest>
  159.         auto add_immediate(F && f, Rest&&... rest) ->std::future<decltype(f( rest...))> {  //note: me edit
  160. //            return push([&](int zerooo,auto restLocal...){
  161. //              return f(restLocal...);
  162. //          },rest...);
  163. //          return push([&](int zerooo){
  164. //              return f(rest...);
  165. //          });
  166.             auto pck = std::make_shared<std::packaged_task<decltype(f( rest...))(int)>>( //note: me edit
  167.                 std::bind(std::forward<F>(f), std::forward<Rest>(rest)...)
  168.                 );
  169.             auto _f = new std::function<void(int id)>([pck](int id) {
  170.                 (*pck)(id);
  171.             });
  172.            
  173.             this->q.push(_f);
  174.             std::unique_lock<std::mutex> lock(this->mutex);
  175.             this->cv.notify_one();
  176.             return pck->get_future();
  177.         }
  178.        
  179.         template<typename F>
  180.         auto add_immediate(F && f) ->std::future<decltype(f())> {  //note: me edit
  181.             return push([&](int zerooo){
  182.                 return f();
  183.             });
  184.         }
  185.  
  186.  
  187.         // run the user's function that excepts argument int - id of the running thread. returned value is templatized
  188.         // operator returns std::future, where the user can get the result and rethrow the catched exceptins
  189.         template<typename F>
  190.         auto push(F && f) ->std::future<decltype(f(0))> { //note: me edit
  191.             auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f)); //note: me edit
  192.             auto _f = new std::function<void(int id)>([pck](int id) {
  193.                 (*pck)(id);
  194.             });
  195.             this->q.push(_f);
  196.             std::unique_lock<std::mutex> lock(this->mutex);
  197.             this->cv.notify_one();
  198.             return pck->get_future();
  199.         }
  200.  
  201.  
  202.     private:
  203.  
  204.         // deleted
  205.         thread_pool(const thread_pool &);// = delete;
  206.         thread_pool(thread_pool &&);// = delete;
  207.         thread_pool & operator=(const thread_pool &);// = delete;
  208.         thread_pool & operator=(thread_pool &&);// = delete;
  209.  
  210.         void set_thread(int i) {
  211.             std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
  212.             auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
  213.                 std::atomic<bool> & _flag = *flag;
  214.                 std::function<void(int id)> * _f;
  215.                 bool isPop = this->q.pop(_f);
  216.                 while (true) {
  217.                     while (isPop) {  // if there is anything in the queue
  218.                         std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
  219.                         (*_f)(i);
  220.                         if (_flag)
  221.                             return;  // the thread is wanted to stop, return even if the queue is not empty yet
  222.                         else
  223.                             isPop = this->q.pop(_f);
  224.                     }
  225.                     // the queue is empty here, wait for the next command
  226.                     std::unique_lock<std::mutex> lock(this->mutex);
  227.                     ++this->nWaiting;
  228.                     this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
  229.                     --this->nWaiting;
  230.                     if (!isPop)
  231.                         return;  // if the queue is empty and this->isDone == true or *flag then return
  232.                 }
  233.             };
  234.             this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
  235.         }
  236.  
  237.         void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
  238.  
  239.         std::vector<std::unique_ptr<std::thread>> threads;
  240.         std::vector<std::shared_ptr<std::atomic<bool>>> flags;
  241.         detail::Queue<std::function<void(int id)> *> q;
  242.         std::atomic<bool> isDone;
  243.         std::atomic<bool> isStop;
  244.         std::atomic<int> nWaiting;  // how many threads are waiting
  245.  
  246.         std::mutex mutex;
  247.         std::condition_variable cv;
  248.     };
  249.  
  250. }
  251. //-------------user here
  252. ctpl::thread_pool pool(4);
  253. class C{
  254.     public: int d=35;
  255. };
  256. class B{
  257.     public: C* c;
  258.     public: void test(){
  259.         std::vector<std::future<void>> cac;
  260.         for(int n=0;n<5;n++){
  261.             cac.push_back(
  262.                 pool.push([&](int threadId){  
  263.                     test2();
  264.                 })
  265.             );
  266.         }
  267.         for(auto& ele : cac){
  268.             ele.get();
  269.         }
  270.     };
  271.     public: void test2(){
  272.         std::vector<std::future<void>> cac;
  273.         for(int n=0;n<5;n++){
  274.             cac.push_back(
  275.                 pool.push([&](int threadId){  
  276.                     int accu=0;
  277.                     for(int i=0;i<10000;i++){
  278.                         accu+=i;
  279.                     }
  280.                     std::cout<<accu<<" access c="<<c->d<<std::endl;
  281.                 })
  282.             );
  283.         }
  284.         for(auto& ele : cac){
  285.             ele.get();
  286.         }
  287.     }
  288. };
  289. int main(){
  290.     C c;
  291.     B b; b.c=&c;
  292.     b.test();
  293.     std::cout<<"end"<<std::endl;
  294. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement