Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- template<typename... event_args>
- class thread_pool{
- public:
- using handler_type = std::function<void(event_args...)>;
- thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
- {
- for(auto&& worker: _workers)
- {
- //worker function
- worker = std::thread([this]()
- {
- while (_running)
- {
- //wait for work
- std::unique_lock<std::mutex> _lk{_wait_mutex};
- _cv.wait(_lk, [this]{
- return !_events.empty() || !_running;
- });
- //_lk unlocked
- //check to see why we woke up
- if (!_events.empty()) {//was it new work
- std::unique_lock<std::mutex> _readlk(_queue_mutex);
- auto data = _events.front();
- _events.pop();
- _readlk.unlock();
- invoke(std::move(_handler), std::move(data));
- _cv.notify_all();
- }else if(!_running){//was it a signal to exit
- break;
- }
- //or was it spurious and we should just ignore it
- }
- });
- //end worker function
- }
- }
- ~thread_pool()
- {
- if(_finish_work_before_exit)
- {//block destruction until all work is done
- std::condition_variable _work_remains;
- std::mutex _wr;
- std::unique_lock<std::mutex> lk{_wr};
- _work_remains.wait(lk,[this](){
- return _events.empty();
- });
- }
- _running=false;
- //let all workers know to exit
- _cv.notify_all();
- //attempt to join all workers
- for(auto&& _worker: _workers)
- {
- if(_worker.joinable())
- {
- _worker.join();
- }
- }
- }
- handler_type& handler()
- {
- return _handler;
- }
- void propagate(event_args&&... args)
- {
- //lock before push
- std::unique_lock<std::mutex> _lk(_queue_mutex);
- {
- _events.emplace(std::make_tuple(args...));
- }
- _lk.unlock();//explicit unlock
- _cv.notify_one();//let worker know that data is available
- }
- private:
- bool _finish_work_before_exit;
- handler_type _handler;
- std::queue<std::tuple<event_args...>> _events;
- std::vector<std::thread> _workers;
- std::atomic_bool _running;
- std::condition_variable _cv;
- std::mutex _wait_mutex;
- std::mutex _queue_mutex;
- //helpers used to unpack tuple into function call
- template<typename Func, typename Tuple, std::size_t... I>
- auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
- {
- return func(std::get<I>(std::forward<Tuple&&>(t))...);
- }
- template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
- auto invoke(Func&& func, Tuple&& t)
- {
- return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
- }
- };
- if(_finish_work_before_exit)
- {//block destruction until all work is done
- std::condition_variable _work_remains;
- std::mutex _wr;
- std::unique_lock<std::mutex> lk{_wr};
- _work_remains.wait(lk,[this](){
- return _events.empty();
- });
- }
- std::mutex writemtx;
- thread_pool<int> pool{
- [&](int i){
- std::unique_lock<std::mutex> lk{writemtx};
- std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
- },
- 8//threads
- };
- for (int i=0; i<8192; ++i) {
- pool.propagate(std::move(i));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement