Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // \File threadpool.h
- // \Author Dmitry Kozlov
- #pragma once
- #include <condition_variable>
- #include <future>
- #include <mutex>
- #include <queue>
- #include <thread>
- #include <atomic>
- #include <iostream>
- // Concurrent queue storing elements of type T.
- // Queue supports the following type of operations:
- // * Push element (atomically)
- // * Try popping element (returns false if it is empty)
- // * Wait and pop element (blocks threads)
- // * Get queue size
- //
- // The queue can be in enabled or disabled state.
- // * Enabled mode turns on wait_and_pop operation,
- // all the threads are waiting on the signal.
- // * Disabling the queue releases all the threads
- // waiting for wait_and_pop.
- template <typename T>
- class concurrent_queue {
- public:
- concurrent_queue()
- : disabled_(false) {
- }
- ~concurrent_queue() {
- disable();
- }
- // Push element: one of the threads is going to be woken up to process
- // this if any.
- template <typename U> void push(U &&u) {
- std::lock_guard<std::mutex> lock(mutex_);
- queue_.push(std::forward<U>(u));
- // Notify one of the threads waiting on CV.
- cv_.notify_one();
- }
- // Wait until there are elements in the queue and pop one.
- // This function blocks the thread util either:
- // * There is element in the queue (in this case it is popped and ret true).
- // * The queue has been disabled (ret false).
- bool wait_and_pop(T& t) {
- std::unique_lock<std::mutex> lock(mutex_);
- // If queue is empty, wait on CV until either:
- // * Something is in the queue.
- // * Queue has been disabled.
- if (queue_.empty()) {
- cv_.wait(lock, [this]() { return !queue_.empty() || disabled_; });
- // Return immediately if we have been disabled.
- if (disabled_) return false;
- }
- // Here the queue is non-empty, so pop and return true.
- t = std::move(queue_.front());
- queue_.pop();
- return true;
- }
- // Try popping element. Returns true if element has been popped, false if
- // there are no elements in the queue.
- bool try_pop(T &t) {
- std::lock_guard<std::mutex> lock(mutex_);
- if (queue_.empty()) return false;
- t = std::move(queue_.front());
- queue_.pop();
- return true;
- }
- // Return queue size
- size_t size() const {
- std::unique_lock<std::mutex> lock(mutex_);
- return queue_.size();
- }
- // Disable the queue: all threads waiting in wait_and_pop are released.
- void disable() {
- std::lock_guard<std::mutex> lock(mutex_);
- disabled_ = true;
- cv_.notify_all();
- }
- // Enables the queue: wait_and_pop starts working again.
- void enable() {
- std::lock_guard<std::mutex> lock(mutex_);
- disabled_ = false;
- }
- private:
- // Queue guard mutex
- mutable std::mutex mutex_;
- // Condition variable for wait_and_pop
- std::condition_variable cv_;
- // Underlying container
- std::queue<T> queue_;
- // Flag to disable wait_and_pop
- bool disabled_;
- };
- // Thread pool, which can execute any callable object returning type R.
- // For example thread_pool<int> accepts int(), int(char, char), int(etc).
- // thread_pool.submit() does parameter perfect forwarding.
- template <typename R>
- class thread_pool {
- public:
- // Constructor optionally specifies the number of threads to use.
- // std::thread::hardware_concurrency() threads used by default.
- explicit
- thread_pool(std::size_t nt = 0)
- : done_(false) {
- auto num_threads = nt == 0 ? std::thread::hardware_concurrency() : nt;
- num_threads = num_threads == 0 ? 2 : num_threads;
- // Create threads.
- for (auto i = 0U; i < num_threads; ++i) {
- threads_.push_back(std::thread(&thread_pool::run_loop, this));
- }
- }
- ~thread_pool() {
- // done_ flag shuts threads down.
- done_ = true;
- // Disable the queue to make sure all threads waiting for task in
- // queue_.wait_and_pop are released.
- queue_.disable();
- // Join the threads
- std::for_each(threads_.begin(), threads_.end(),
- std::mem_fun_ref(&std::thread::join));
- }
- // Submit a callable task into the pool. Arguments are perfect-forwarded
- // into the task. Future holding R is returned to the caller.
- template <typename Func, typename ...Args>
- auto submit(Func &&f, Args&&... args) {
- // Package the task w/ parameters
- std::packaged_task<R()> task {
- std::bind(std::forward<Func>(f),
- std::forward<Args>(args)...)
- };
- auto future = task.get_future();
- // Push into the taks queue
- queue_.push(std::move(task));
- return future;
- }
- // Return the number of tasks in the queue.
- auto size() const {
- return queue_.size();
- }
- // Return the number of threads in the pool.
- auto num_threads() {
- return threads_.size();
- }
- private:
- // Each thread executes this in a loop
- void run_loop() {
- // done_ flag is used to shutdown the threads.
- while (!done_) {
- // Wait for the task in the queue and execute it.
- std::packaged_task<R()> t;
- // True returned from wait_and_pop means we have task to execute.
- // False means the queue has been disabled and we should not try
- // executing t.
- if (queue_.wait_and_pop(t)) {
- t();
- }
- }
- }
- // Work queue.
- concurrent_queue<std::packaged_task<R()>> queue_;
- // Shutdown flag.
- std::atomic_bool done_;
- std::vector<std::thread> threads_;
- };
Add Comment
Please, Sign In to add comment