Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <queue>
- #include <mutex>
- #include <chrono>
- #include <thread>
- #include <cstdint>
- #include <iostream>
- #include <condition_variable>
- /**
- * throw the exception when waiting is interrupted by other threads.
- */
- class BlockInterrupt
- {
- public:
- BlockInterrupt()
- {
- }
- };
- /**
- * RAII, execute the function in destructor.
- */
- class Final
- {
- public:
- explicit Final(const std::function<void()>& final)
- :m_final(final)
- {
- }
- ~Final()
- {
- try
- {
- m_final();
- }
- catch(...)
- {
- }
- }
- private:
- const std::function<void()> m_final;
- };
- template<typename T>
- class BlockQueue
- {
- public:
- BlockQueue()
- {
- }
- /**
- * queue will be destroyed, interrupt all blocking thread.
- * the mutex will be locked until the deconstruction is finished.
- */
- ~BlockQueue()
- {
- interrupt();
- std::unique_lock<std::mutex> lock(m_mutex);
- m_interrupted = true;
- }
- BlockQueue(const BlockQueue&) = delete;
- BlockQueue(BlockQueue&&) = delete;
- BlockQueue& operator=(const BlockQueue&) = delete;
- BlockQueue& operator=(BlockQueue&&) = delete;
- BlockQueue& push(const T& value)
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- m_queue.push(value);
- //if some threads is wait for taking, notify one.
- if(m_waiter_count > 0)
- {
- m_wait_finished.notify_one();
- }
- return *this;
- }
- BlockQueue& push(T&& value)
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- m_queue.push(std::move(value));
- //if some threads is wait for taking, notify one.
- if(m_waiter_count > 0)
- {
- m_wait_finished.notify_one();
- }
- return *this;
- }
- template<typename... Args>
- BlockQueue& emplace(Args&&... args)
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- m_queue.emplace(std::forward<Args>(args)...);
- //if some threads is wait for taking, notify one.
- if(m_waiter_count > 0)
- {
- m_wait_finished.notify_one();
- }
- return *this;
- }
- T take()
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- if(m_interrupted)
- {
- throw BlockInterrupt();
- }
- if(!m_queue.empty())
- {
- T value = std::move(const_cast<T&>(m_queue.front()));
- m_queue.pop();
- return value;
- }
- //if the count of waiter is overflow.
- if(m_waiter_count == std::numeric_limits<decltype(m_waiter_count)>::max())
- {
- throw std::runtime_error("The counter for waiter of block queue is overflow.");
- }
- //the call must be blocked.
- ++m_waiter_count;
- Final final([this]()
- {
- //this code will be execute anyhow.
- --m_waiter_count;
- if(m_waiter_count == 0 && m_interrupted == true)
- {
- //if count of waiter is zero, and some threads call the interrupt(), notify all.
- m_interrupt_finished.notify_all();
- }
- });
- m_wait_finished.wait(lock, [this](){return !m_queue.empty() || m_interrupted;});
- if(m_interrupted)
- {
- throw BlockInterrupt();
- }
- T value = std::move(const_cast<T&>(m_queue.front()));
- m_queue.pop();
- return value;
- }
- bool try_pop(T& value)
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- if(!m_queue.empty())
- {
- value = std::move(const_cast<T&>(m_queue.front()));
- m_queue.pop();
- return true;
- }
- return false;
- }
- /**
- * interrupt all block threads, return until all threads are interrupted.
- */
- BlockQueue& interrupt() noexcept
- {
- std::unique_lock<std::mutex> lock(m_mutex);
- if(m_waiter_count == 0)
- {
- //no waiter, return.
- m_interrupted = false;
- return *this;
- }
- m_interrupted = true;
- //wake up all block threads.
- m_wait_finished.notify_all();
- //wait for all threads are interrupted.
- m_interrupt_finished.wait(lock, [this](){return m_waiter_count == 0;});
- m_interrupted = false;
- return *this;
- }
- private:
- /**
- * mutex for accessing of "m_queue", "m_interrupted", "m_waiter_count".
- */
- std::mutex m_mutex;
- std::queue<T> m_queue;
- /**
- * express whether the queue is interrupted.
- */
- bool m_interrupted = false;
- /**
- * count of threads blocked.
- */
- std::uint32_t m_waiter_count = std::numeric_limits<decltype(m_waiter_count)>::min();
- /**
- * condition variable for notifying that the queue is readable or interrupted (so the threads blocked should be awakened).
- */
- std::condition_variable m_wait_finished;
- /**
- * condition variable for notifying that no thread is blocking (so interrupt has finished).
- */
- std::condition_variable m_interrupt_finished;
- };
- int main(int argc, char const *argv[])
- {
- BlockQueue<int> q;
- std::thread taker1([&]()
- {
- try
- {
- q.take();
- }
- catch(const BlockInterrupt&)
- {
- std::cout << "taker1 interrupted." << std::endl;
- }
- });
- std::thread taker2([&]()
- {
- try
- {
- q.take();
- }
- catch(const BlockInterrupt&)
- {
- std::cout << "taker2 interrupted." << std::endl;
- }
- });
- using namespace std::literals::chrono_literals;
- std::this_thread::sleep_for(0.5s);
- std::thread interrupter([&]()
- {
- q.interrupt();
- });
- taker1.join();
- taker2.join();
- interrupter.join();
- return 0;
- }
Add Comment
Please, Sign In to add comment