Guest User

Untitled

a guest
May 21st, 2018
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.16 KB | None | 0 0
  1. // \File threadpool.h
  2. // \Author Dmitry Kozlov
  3.  
  4. #pragma once
  5.  
  6. #include <condition_variable>
  7. #include <future>
  8. #include <mutex>
  9. #include <queue>
  10. #include <thread>
  11. #include <atomic>
  12. #include <iostream>
  13.  
  14. // Concurrent queue storing elements of type T.
  15. // Queue supports the following type of operations:
  16. // * Push element (atomically)
  17. // * Try popping element (returns false if it is empty)
  18. // * Wait and pop element (blocks threads)
  19. // * Get queue size
  20. //
  21. // The queue can be in enabled or disabled state.
  22. // * Enabled mode turns on wait_and_pop operation,
  23. // all the threads are waiting on the signal.
  24. // * Disabling the queue releases all the threads
  25. // waiting for wait_and_pop.
  26. template <typename T>
  27. class concurrent_queue {
  28. public:
  29. concurrent_queue()
  30. : disabled_(false) {
  31. }
  32.  
  33. ~concurrent_queue() {
  34. disable();
  35. }
  36.  
  37. // Push element: one of the threads is going to be woken up to process
  38. // this if any.
  39. template <typename U> void push(U &&u) {
  40. std::lock_guard<std::mutex> lock(mutex_);
  41.  
  42. queue_.push(std::forward<U>(u));
  43.  
  44. // Notify one of the threads waiting on CV.
  45. cv_.notify_one();
  46. }
  47.  
  48. // Wait until there are elements in the queue and pop one.
  49. // This function blocks the thread util either:
  50. // * There is element in the queue (in this case it is popped and ret true).
  51. // * The queue has been disabled (ret false).
  52. bool wait_and_pop(T& t) {
  53. std::unique_lock<std::mutex> lock(mutex_);
  54.  
  55. // If queue is empty, wait on CV until either:
  56. // * Something is in the queue.
  57. // * Queue has been disabled.
  58. if (queue_.empty()) {
  59. cv_.wait(lock, [this]() { return !queue_.empty() || disabled_; });
  60. // Return immediately if we have been disabled.
  61. if (disabled_) return false;
  62. }
  63.  
  64. // Here the queue is non-empty, so pop and return true.
  65. t = std::move(queue_.front());
  66. queue_.pop();
  67. return true;
  68. }
  69.  
  70. // Try popping element. Returns true if element has been popped, false if
  71. // there are no elements in the queue.
  72. bool try_pop(T &t) {
  73. std::lock_guard<std::mutex> lock(mutex_);
  74.  
  75. if (queue_.empty()) return false;
  76.  
  77. t = std::move(queue_.front());
  78. queue_.pop();
  79.  
  80. return true;
  81. }
  82.  
  83. // Return queue size
  84. size_t size() const {
  85. std::unique_lock<std::mutex> lock(mutex_);
  86. return queue_.size();
  87. }
  88.  
  89. // Disable the queue: all threads waiting in wait_and_pop are released.
  90. void disable() {
  91. std::lock_guard<std::mutex> lock(mutex_);
  92. disabled_ = true;
  93. cv_.notify_all();
  94. }
  95.  
  96. // Enables the queue: wait_and_pop starts working again.
  97. void enable() {
  98. std::lock_guard<std::mutex> lock(mutex_);
  99. disabled_ = false;
  100. }
  101.  
  102. private:
  103. // Queue guard mutex
  104. mutable std::mutex mutex_;
  105. // Condition variable for wait_and_pop
  106. std::condition_variable cv_;
  107. // Underlying container
  108. std::queue<T> queue_;
  109. // Flag to disable wait_and_pop
  110. bool disabled_;
  111. };
  112.  
  113. // Thread pool, which can execute any callable object returning type R.
  114. // For example thread_pool<int> accepts int(), int(char, char), int(etc).
  115. // thread_pool.submit() does parameter perfect forwarding.
  116. template <typename R>
  117. class thread_pool {
  118. public:
  119. // Constructor optionally specifies the number of threads to use.
  120. // std::thread::hardware_concurrency() threads used by default.
  121. explicit
  122. thread_pool(std::size_t nt = 0)
  123. : done_(false) {
  124. auto num_threads = nt == 0 ? std::thread::hardware_concurrency() : nt;
  125. num_threads = num_threads == 0 ? 2 : num_threads;
  126. // Create threads.
  127. for (auto i = 0U; i < num_threads; ++i) {
  128. threads_.push_back(std::thread(&thread_pool::run_loop, this));
  129. }
  130. }
  131.  
  132. ~thread_pool() {
  133. // done_ flag shuts threads down.
  134. done_ = true;
  135. // Disable the queue to make sure all threads waiting for task in
  136. // queue_.wait_and_pop are released.
  137. queue_.disable();
  138. // Join the threads
  139. std::for_each(threads_.begin(), threads_.end(),
  140. std::mem_fun_ref(&std::thread::join));
  141. }
  142.  
  143. // Submit a callable task into the pool. Arguments are perfect-forwarded
  144. // into the task. Future holding R is returned to the caller.
  145. template <typename Func, typename ...Args>
  146. auto submit(Func &&f, Args&&... args) {
  147. // Package the task w/ parameters
  148. std::packaged_task<R()> task {
  149. std::bind(std::forward<Func>(f),
  150. std::forward<Args>(args)...)
  151. };
  152. auto future = task.get_future();
  153. // Push into the taks queue
  154. queue_.push(std::move(task));
  155. return future;
  156. }
  157.  
  158. // Return the number of tasks in the queue.
  159. auto size() const {
  160. return queue_.size();
  161. }
  162.  
  163. // Return the number of threads in the pool.
  164. auto num_threads() {
  165. return threads_.size();
  166. }
  167.  
  168. private:
  169. // Each thread executes this in a loop
  170. void run_loop() {
  171. // done_ flag is used to shutdown the threads.
  172. while (!done_) {
  173. // Wait for the task in the queue and execute it.
  174. std::packaged_task<R()> t;
  175. // True returned from wait_and_pop means we have task to execute.
  176. // False means the queue has been disabled and we should not try
  177. // executing t.
  178. if (queue_.wait_and_pop(t)) {
  179. t();
  180. }
  181. }
  182. }
  183.  
  184. // Work queue.
  185. concurrent_queue<std::packaged_task<R()>> queue_;
  186. // Shutdown flag.
  187. std::atomic_bool done_;
  188. std::vector<std::thread> threads_;
  189. };
Add Comment
Please, Sign In to add comment