Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <boost\lockfree\queue.hpp>
- #include <iostream>
- #include <thread>
- #include <atomic>
- #include <vector>
- #include <memory>
- #include <condition_variable>
- #include <functional>
- template<typename ItemType, unsigned long ulQueueCapacity>
- class ThreadPool {
- public:
- ThreadPool(
- unsigned long const ulThreadCount,
- std::function<void(ItemType item)> pfItemCallbackRoutine)
- : m_state(State::Genesis)
- , m_ulThreadCount(ulThreadCount)
- , m_pfItemCallbackRoutine(pfItemCallbackRoutine)
- {
- }
- ~ThreadPool()
- {
- {
- State const nState = m_state.load(std::memory_order_relaxed);
- bool bNeedToStop = (nState == State::Running);
- if (bNeedToStop) {
- stop();
- }
- }
- }
- // return TRUE if success
- bool start()
- {
- {
- State const nState = m_state.load(std::memory_order_relaxed);
- bool bCanStart = (nState == State::Genesis);
- if (!bCanStart) {
- wprintf(L"ThreadPool start fail\n");
- return false;
- }
- }
- m_state.store(State::Starting, std::memory_order_relaxed);
- try {
- wprintf(L"!!! ulThreadCount=%lu\n", m_ulThreadCount);
- for (size_t i = 0; i < m_dwThreadCount; ++i) {
- std::thread thread = std::thread(std::bind(&ThreadPool::threadRoutine, this));
- m_vThreads.push_back(std::move(thread));
- }
- }
- catch(std::system_error const &e) {
- wprintf(L"ThreadPool start failed, code=[%lu], what=[%s]\n", e.code(), e.what());
- return false;
- }
- m_state.store(State::Running, std::memory_order_relaxed);
- return true;
- }
- // return TRUE if success
- bool stop()
- {
- {
- State const nState = m_state.load(std::memory_order_relaxed);
- bool bCanStop = (nState == State::Running);
- if (!bCanStop) {
- wprintf(L"ThreadPool stop failed");
- return false;
- }
- }
- m_state.store(State::Stopping, std::memory_order_relaxed);
- m_condVarQueue.notify_all(); // wake all sleeping threads
- for (std::thread &thread : m_vThreads) {
- thread.join();
- }
- m_vThreads.clear();
- m_queue.consume_all([](ItemType){});
- m_state.store(State::Genesis, std::memory_order_relaxed);
- return true;
- }
- // return true if success
- bool enqueue(ItemType const &item)
- {
- if (m_queue.push(item)) {
- m_condVarQueue.notify_one();
- return true;
- }
- return false;
- }
- private: // disable copy and move
- ThreadPool(ThreadPool&);
- ThreadPool(ThreadPool&&);
- ThreadPool& operator=(ThreadPool&);
- ThreadPool& operator=(ThreadPool&&);
- private: // methods
- void threadRoutine()
- {
- while (1) {
- std::unique_lock<std::mutex> lock(m_mutexQueueCondVar);
- m_condVarQueue.wait(lock);
- State const nState = m_state.load(std::memory_order_relaxed);
- bool bContinueRunning = (nState == State::Running);
- if (!bContinueRunning) {
- return;
- }
- ItemType item;
- if (m_queue.pop(/*out*/ item)) {
- if (m_pfItemCallbackRoutine) {
- m_pfItemCallbackRoutine(item);
- }
- }
- }
- }
- private: // attributes
- enum class State { Genesis, Starting, Running, Stopping };
- std::atomic<State> m_state;
- unsigned long m_ulThreadCount;
- std::vector<std::thread> m_vThreads;
- std::function<void(ItemType item)> m_pfItemCallbackRoutine;
- std::condition_variable m_condVarQueue;
- std::mutex m_mutexQueueCondVar;
- boost::lockfree::queue<ItemType,
- boost::lockfree::fixed_sized<true>,
- boost::lockfree::capacity<ulQueueCapacity>> m_queue;
- };
Advertisement
Add Comment
Please, Sign In to add comment