Guest User

Untitled

a guest
Oct 24th, 2017
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.31 KB | None | 0 0
  1. #ifndef CONCURRENT_THREADPOOL_H
  2. #define CONCURRENT_THREADPOOL_H
  3.  
  4. #include <thread>
  5. #include <mutex>
  6. #include <functional>
  7. #include <condition_variable>
  8. #include <queue>
  9.  
  10. /**
  11. * Simple ThreadPool that creates `threadCount` threads upon its creation,
  12. * and pulls from a queue to get new jobs.
  13. *
  14. * This class requires a number of c++11 features be present in your compiler.
  15. */
  16. class ThreadPool
  17. {
  18. public:
  19. explicit ThreadPool(int threadCount) :
  20. _jobsLeft(0),
  21. _bailout(false)
  22. {
  23. _threads = std::vector<std::thread>(threadCount);
  24.  
  25. for (int index = 0; index < threadCount; ++index)
  26. {
  27. _threads[index] = std::move(std::thread([this]
  28. {
  29. this->Task();
  30. }));
  31. }
  32. }
  33.  
  34. /**
  35. * JoinAll on deconstruction
  36. */
  37. ~ThreadPool()
  38. {
  39. JoinAll();
  40. }
  41.  
  42. /**
  43. * Add a new job to the pool. If there are no jobs in the queue,
  44. * a thread is woken up to take the job. If all threads are busy,
  45. * the job is added to the end of the queue.
  46. */
  47. void AddJob(std::function<void(void)> job)
  48. {
  49. // scoped lock
  50. {
  51. std::lock_guard<std::mutex> lock(_queueMutex);
  52. _queue.emplace(job);
  53. }
  54. // scoped lock
  55. {
  56. std::lock_guard<std::mutex> lock(_jobsLeftMutex);
  57. ++_jobsLeft;
  58. }
  59. _jobAvailableVar.notify_one();
  60. }
  61.  
  62. /**
  63. * Join with all threads. Block until all threads have completed.
  64. * The queue may be filled after this call, but the threads will
  65. * be done. After invoking `ThreadPool::JoinAll`, the pool can no
  66. * longer be used.
  67. */
  68. void JoinAll()
  69. {
  70. // scoped lock
  71. {
  72. std::lock_guard<std::mutex> lock(_queueMutex);
  73. if (_bailout)
  74. {
  75. return;
  76. }
  77. _bailout = true;
  78. }
  79.  
  80. // note that we're done, and wake up any thread that's
  81. // waiting for a new job
  82. _jobAvailableVar.notify_all();
  83.  
  84. for (auto& x : _threads)
  85. {
  86. if (x.joinable())
  87. {
  88. x.join();
  89. }
  90. }
  91. }
  92.  
  93. /**
  94. * Wait for the pool to empty before continuing.
  95. * This does not call `std::thread::join`, it only waits until
  96. * all jobs have finished executing.
  97. */
  98. void WaitAll()
  99. {
  100. std::unique_lock<std::mutex> lock(_jobsLeftMutex);
  101. if (_jobsLeft > 0)
  102. {
  103. _waitVar.wait(lock, [this]
  104. {
  105. return _jobsLeft == 0;
  106. });
  107. }
  108. }
  109.  
  110. private:
  111. /**
  112. * Take the next job in the queue and run it.
  113. * Notify the main thread that a job has completed.
  114. */
  115. void Task()
  116. {
  117. while (true)
  118. {
  119. std::function<void(void)> job;
  120.  
  121. // scoped lock
  122. {
  123. std::unique_lock<std::mutex> lock(_queueMutex);
  124.  
  125. if (_bailout)
  126. {
  127. return;
  128. }
  129.  
  130. // Wait for a job if we don't have any.
  131. _jobAvailableVar.wait(lock, [this]
  132. {
  133. return _queue.size() > 0 || _bailout;
  134. });
  135.  
  136. if (_bailout)
  137. {
  138. return;
  139. }
  140.  
  141. // Get job from the queue
  142. auto startTime = std::chrono::high_resolution_clock::now();
  143. job = _queue.front();
  144. _queue.pop();
  145. auto endTime = std::chrono::high_resolution_clock::now();
  146. std::cout << "thread pop (sec): " << std::chrono::duration<double>(endTime - startTime).count() << std::endl;
  147. }
  148.  
  149. job();
  150.  
  151. // scoped lock
  152. {
  153. std::lock_guard<std::mutex> lock(_jobsLeftMutex);
  154. --_jobsLeft;
  155. }
  156.  
  157. _waitVar.notify_one();
  158. }
  159. }
  160.  
  161. std::vector<std::thread> _threads;
  162. std::queue<std::function<void(void)>> _queue;
  163.  
  164. int _jobsLeft;
  165. bool _bailout;
  166. std::condition_variable _jobAvailableVar;
  167. std::condition_variable _waitVar;
  168. std::mutex _jobsLeftMutex;
  169. std::mutex _queueMutex;
  170. };
  171.  
  172. #endif //CONCURRENT_THREADPOOL_H
Add Comment
Please, Sign In to add comment