Guest User

Untitled

a guest
Mar 24th, 2016
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.88 KB | None | 0 0
  1. #include <boost\lockfree\queue.hpp>
  2. #include <iostream>
  3. #include <thread>
  4. #include <atomic>
  5. #include <vector>
  6. #include <memory>
  7. #include <condition_variable>
  8. #include <functional>
  9.  
  10. template<typename ItemType, unsigned long ulQueueCapacity>
  11. class ThreadPool {
  12. public:
  13. ThreadPool(
  14. unsigned long const ulThreadCount,
  15. std::function<void(ItemType item)> pfItemCallbackRoutine)
  16. : m_state(State::Genesis)
  17. , m_ulThreadCount(ulThreadCount)
  18. , m_pfItemCallbackRoutine(pfItemCallbackRoutine)
  19. {
  20. }
  21.  
  22. ~ThreadPool()
  23. {
  24. {
  25. State const nState = m_state.load(std::memory_order_relaxed);
  26. bool bNeedToStop = (nState == State::Running);
  27. if (bNeedToStop) {
  28. stop();
  29. }
  30. }
  31. }
  32.  
  33. // return TRUE if success
  34. bool start()
  35. {
  36. {
  37. State const nState = m_state.load(std::memory_order_relaxed);
  38. bool bCanStart = (nState == State::Genesis);
  39. if (!bCanStart) {
  40. wprintf(L"ThreadPool start fail\n");
  41. return false;
  42. }
  43. }
  44. m_state.store(State::Starting, std::memory_order_relaxed);
  45. try {
  46. wprintf(L"!!! ulThreadCount=%lu\n", m_ulThreadCount);
  47. for (size_t i = 0; i < m_dwThreadCount; ++i) {
  48. std::thread thread = std::thread(std::bind(&ThreadPool::threadRoutine, this));
  49. m_vThreads.push_back(std::move(thread));
  50. }
  51. }
  52. catch(std::system_error const &e) {
  53. wprintf(L"ThreadPool start failed, code=[%lu], what=[%s]\n", e.code(), e.what());
  54. return false;
  55. }
  56. m_state.store(State::Running, std::memory_order_relaxed);
  57. return true;
  58. }
  59.  
  60. // return TRUE if success
  61. bool stop()
  62. {
  63. {
  64. State const nState = m_state.load(std::memory_order_relaxed);
  65. bool bCanStop = (nState == State::Running);
  66. if (!bCanStop) {
  67. wprintf(L"ThreadPool stop failed");
  68. return false;
  69. }
  70. }
  71. m_state.store(State::Stopping, std::memory_order_relaxed);
  72. m_condVarQueue.notify_all(); // wake all sleeping threads
  73. for (std::thread &thread : m_vThreads) {
  74. thread.join();
  75. }
  76. m_vThreads.clear();
  77. m_queue.consume_all([](ItemType){});
  78. m_state.store(State::Genesis, std::memory_order_relaxed);
  79. return true;
  80. }
  81.  
  82. // return true if success
  83. bool enqueue(ItemType const &item)
  84. {
  85. if (m_queue.push(item)) {
  86. m_condVarQueue.notify_one();
  87. return true;
  88. }
  89. return false;
  90. }
  91.  
  92. private: // disable copy and move
  93. ThreadPool(ThreadPool&);
  94. ThreadPool(ThreadPool&&);
  95. ThreadPool& operator=(ThreadPool&);
  96. ThreadPool& operator=(ThreadPool&&);
  97.  
  98. private: // methods
  99. void threadRoutine()
  100. {
  101. while (1) {
  102. std::unique_lock<std::mutex> lock(m_mutexQueueCondVar);
  103. m_condVarQueue.wait(lock);
  104.  
  105. State const nState = m_state.load(std::memory_order_relaxed);
  106. bool bContinueRunning = (nState == State::Running);
  107. if (!bContinueRunning) {
  108. return;
  109. }
  110.  
  111. ItemType item;
  112. if (m_queue.pop(/*out*/ item)) {
  113. if (m_pfItemCallbackRoutine) {
  114. m_pfItemCallbackRoutine(item);
  115. }
  116. }
  117. }
  118. }
  119.  
  120. private: // attributes
  121. enum class State { Genesis, Starting, Running, Stopping };
  122. std::atomic<State> m_state;
  123.  
  124. unsigned long m_ulThreadCount;
  125. std::vector<std::thread> m_vThreads;
  126. std::function<void(ItemType item)> m_pfItemCallbackRoutine;
  127.  
  128. std::condition_variable m_condVarQueue;
  129. std::mutex m_mutexQueueCondVar;
  130. boost::lockfree::queue<ItemType,
  131. boost::lockfree::fixed_sized<true>,
  132. boost::lockfree::capacity<ulQueueCapacity>> m_queue;
  133. };
Advertisement
Add Comment
Please, Sign In to add comment