Advertisement
Guest User

Untitled

a guest
Oct 28th, 2016
68
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.80 KB | None | 0 0
  1. template<typename... event_args>
  2. class thread_pool{
  3. public:
  4.  
  5. using handler_type = std::function<void(event_args...)>;
  6.  
  7. thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
  8. {
  9. for(auto&& worker: _workers)
  10. {
  11. //worker function
  12. worker = std::thread([this]()
  13. {
  14. while (_running)
  15. {
  16. //wait for work
  17. std::unique_lock<std::mutex> _lk{_wait_mutex};
  18. _cv.wait(_lk, [this]{
  19. return !_events.empty() || !_running;
  20. });
  21. //_lk unlocked
  22.  
  23. //check to see why we woke up
  24. if (!_events.empty()) {//was it new work
  25. std::unique_lock<std::mutex> _readlk(_queue_mutex);
  26. auto data = _events.front();
  27. _events.pop();
  28. _readlk.unlock();
  29.  
  30. invoke(std::move(_handler), std::move(data));
  31. _cv.notify_all();
  32. }else if(!_running){//was it a signal to exit
  33. break;
  34. }
  35. //or was it spurious and we should just ignore it
  36. }
  37. });
  38. //end worker function
  39. }
  40. }
  41.  
  42. ~thread_pool()
  43. {
  44. if(_finish_work_before_exit)
  45. {//block destruction until all work is done
  46. std::condition_variable _work_remains;
  47. std::mutex _wr;
  48.  
  49. std::unique_lock<std::mutex> lk{_wr};
  50. _work_remains.wait(lk,[this](){
  51. return _events.empty();
  52. });
  53. }
  54.  
  55. _running=false;
  56.  
  57. //let all workers know to exit
  58. _cv.notify_all();
  59.  
  60.  
  61. //attempt to join all workers
  62. for(auto&& _worker: _workers)
  63. {
  64. if(_worker.joinable())
  65. {
  66. _worker.join();
  67. }
  68. }
  69. }
  70.  
  71. handler_type& handler()
  72. {
  73. return _handler;
  74. }
  75.  
  76. void propagate(event_args&&... args)
  77. {
  78. //lock before push
  79. std::unique_lock<std::mutex> _lk(_queue_mutex);
  80. {
  81. _events.emplace(std::make_tuple(args...));
  82. }
  83. _lk.unlock();//explicit unlock
  84. _cv.notify_one();//let worker know that data is available
  85. }
  86.  
  87. private:
  88. bool _finish_work_before_exit;
  89.  
  90. handler_type _handler;
  91.  
  92. std::queue<std::tuple<event_args...>> _events;
  93.  
  94. std::vector<std::thread> _workers;
  95.  
  96. std::atomic_bool _running;
  97.  
  98. std::condition_variable _cv;
  99.  
  100. std::mutex _wait_mutex;
  101.  
  102. std::mutex _queue_mutex;
  103.  
  104.  
  105. //helpers used to unpack tuple into function call
  106. template<typename Func, typename Tuple, std::size_t... I>
  107. auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
  108. {
  109. return func(std::get<I>(std::forward<Tuple&&>(t))...);
  110. }
  111.  
  112. template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
  113. auto invoke(Func&& func, Tuple&& t)
  114. {
  115. return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
  116. }
  117. };
  118.  
  119. if(_finish_work_before_exit)
  120. {//block destruction until all work is done
  121. std::condition_variable _work_remains;
  122. std::mutex _wr;
  123.  
  124. std::unique_lock<std::mutex> lk{_wr};
  125. _work_remains.wait(lk,[this](){
  126. return _events.empty();
  127. });
  128. }
  129.  
  130. std::mutex writemtx;
  131.  
  132.  
  133. thread_pool<int> pool{
  134. [&](int i){
  135. std::unique_lock<std::mutex> lk{writemtx};
  136. std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
  137. },
  138. 8//threads
  139. };
  140.  
  141. for (int i=0; i<8192; ++i) {
  142. pool.propagate(std::move(i));
  143. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement