Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include <deque>
- #include <mutex>
- #include <condition_variable>
- #include <cassert>
- #include <boost/optional.hpp>
- #include "helpermacros.h"
- namespace utils
- {
- enum class CQSide { Front, Back };
- /*!
- * This class template provides thread safe blocking queue on the top of the
- * standard STL queue.
- */
- template <class Value,
- template <class, class> class Sequence = std::deque,
- template <class> class Allocator = std::allocator>
- class ConcurrentQueue
- {
- public: // types
- NON_COPYABLE_NOR_MOVABLE(ConcurrentQueue)
- typedef Allocator<Value> allocator_type;
- typedef Sequence<Value, allocator_type> container_type;
- typedef typename container_type::value_type value_type;
- typedef typename container_type::reference reference;
- typedef typename container_type::const_reference const_reference;
- typedef typename container_type::size_type size_type;
- public: // constructors
- /*!
- * \brief Contructs new empty ConcurrentQueue object.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- explicit ConcurrentQueue(size_type sizeLimit = 0)
- : _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false) {}
- /*!
- * \brief Constructs new ConcurrentQueue object copying the data from the
- * container into it.
- * \param container Container to copy.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- explicit ConcurrentQueue(const container_type& container, size_type sizeLimit = 0)
- : _queue(container)
- , _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false)
- {
- assert(_queue.size() <= _sizeLimit);
- }
- /*!
- * \brief Constructs new ConcurrentQueue object moving the data from the
- * container into it.
- * \param container Container to move.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- explicit ConcurrentQueue(container_type&& container, size_type sizeLimit = 0)
- : _queue(std::move(container))
- , _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false)
- {
- assert(_queue.size() < _sizeLimit);
- }
- /*!
- * \brief Constructs new ConcurrentQueue object with user supplied
- * allocator.
- * \param allocator User supplied allocator.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- explicit ConcurrentQueue(const allocator_type& allocator, size_type sizeLimit = 0)
- : _queue(allocator)
- , _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false)
- {
- }
- /*!
- * \brief Constructs new unlimited ConcurrentQueue object copying the data
- * from the container into it and providing a user supplied allocator.
- * \param container Container to copy.
- * \param allocator User supplied allocator.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- ConcurrentQueue(const container_type& container,
- const allocator_type& allocator,
- size_type sizeLimit = 0)
- : _queue(container, allocator)
- , _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false)
- {
- assert(_queue.size() < _sizeLimit);
- }
- /*!
- * \brief Constructs new ConcurrentQueue object moving the data from the
- * container into it and providing a user supplied allocator.
- * \param container Container to move.
- * \param allocator User supplied allocator.
- * \param sizeLimit The maximum number of elements into the queue. If zero
- * then the queue is unlimited.
- */
- ConcurrentQueue(container_type&& container,
- const allocator_type& allocator,
- size_type sizeLimit = 0)
- : _queue(std::move(container), allocator)
- , _sizeLimit(getLimit(sizeLimit))
- , _isPoppingStopped(false)
- , _isPushingStopped(false)
- {
- assert(_queue.size() < _sizeLimit);
- }
- public: // insert operations
- /*!
- * \brief tryPush Pushes value into the queue if the queue limit is not
- * reached.
- * \param value The value to be pushed.
- * \param side On which side of the queue to push.
- * \return Returns true on success or false when the queue limit is
- * reached or the queue is shutdown.
- */
- bool tryPush(const Value& value, CQSide side = CQSide::Back)
- {
- return tryEmplace(side, value);
- }
- /*!
- * \brief tryPush Moves value into the queue if the queue limit is not
- * reached.
- * \param value The value to be moved.
- * \param side On which side of the queue to move.
- * \return Returns true on success or false when the queue limit is
- * reached or the queue is shutdown.
- */
- bool tryPush(Value&& value, CQSide side = CQSide::Back)
- {
- return tryEmplace(side, std::move(value));
- }
- /*!
- * \brief tryEmplace Constructs new value direclty into the queue if the
- * queue limit is not reached.
- * \param side On which side of the queue to emplace.
- * \param args Arguments for the new value constructor.
- * \return Returns true on success or false when the queue limit is
- * reached or the queue is shutdown.
- */
- template<class... Args>
- bool tryEmplace(CQSide side, Args&&... args)
- {
- auto lambda = [this](UniqueLock&, Predicate) {
- return _queue.size() < _sizeLimit;
- };
- return pushImpl(side, lambda, std::forward<Args>(args)...);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element and pushes value into the queue.
- * \param value The value to be pushed.
- * \param side On which side of the queue to push.
- * \return Returns true on success or false when the queue is shutdown.
- */
- bool waitAndPush(const Value& value, CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, value);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element and moves value into the queue.
- * \param value The value to be moved.
- * \param side On which side of the queue to move.
- * \return Returns true on success or false when the queue is shutdown.
- */
- bool waitAndPush(Value&& value, CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, std::move(value));
- }
- /*!
- * \brief waitAndEmplace Blocks the calling thread until there is space for
- * new element and constructs new value into the queue.
- * \param side On which side of the queue to emplace.
- * \param args Arguments for the new value constructor.
- * \return Returns true on success or false when the queue is shutdown.
- */
- template <class... Args>
- bool waitAndEmplace(CQSide side, Args&&... args)
- {
- auto lambda = [this](UniqueLock& lock, Predicate pred) {
- this->_condition.wait(lock, pred);
- return true;
- };
- return pushImpl(side, lambda, std::forward<Args>(args)...);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element or until provided time interval expires and pushes new
- * value into the queue.
- * \param value The value to be pushed.
- * \param duration Max time interval after which the calling thread will
- * be unblocked.
- * \param side On which side of the queue to push.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Rep, class Period>
- bool waitAndPush(const Value& value,
- const std::chrono::duration<Rep, Period>& duration,
- CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, duration, value);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element or until provided time interval expires and moves new
- * value into the queue.
- * \param value The value to be moved.
- * \param duration Max time interval after which the calling thread will
- * be unblocked.
- * \param side On which side of the queue to move.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Rep, class Period>
- bool waitAndPush(Value&& value,
- const std::chrono::duration<Rep, Period>& duration,
- CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, duration, std::move(value));
- }
- /*!
- * \brief waitAndEmplace Blocks the calling thread until there is space
- * for new element or until provided time interval expires and constructs
- * new value inot the queue.
- * \param side On which side of the queue to emplace.
- * \param duration Max time interval after which the calling thread will
- * be unblocked.
- * \param args Arguments for the new value constructor.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Rep, class Period, class... Args>
- bool waitAndEmplace(CQSide side,
- const std::chrono::duration<Rep, Period>& duration,
- Args&&... args)
- {
- auto lambda = [this, &duration](UniqueLock& lock, Predicate pred) {
- return _condition.wait_for(lock, duration, pred);
- };
- return pushImpl(side, lambda, std::forward<Args>(args)...);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element or until provided time point is passed and pushes new value
- * into the queue.
- * \param value The value to be pushed.
- * \param timePoint The max point in time after which the calling thread
- * will be unblocked.
- * \param side On which side of the queue to push.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Clock, class Duration>
- bool waitAndPush(const Value& value,
- const std::chrono::time_point<Clock, Duration>& timePoint,
- CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, timePoint, value);
- }
- /*!
- * \brief waitAndPush Blocks the calling thread until there is space for
- * new element or until provided time point is passed and moves new value
- * into the queue.
- * \param value The value to be moved.
- * \param timePoint The max point in time after which the calling thread
- * will be unblocked.
- * \param side On which side of the queue to push.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Clock, class Duration>
- bool waitAndPush(Value&& value,
- const std::chrono::time_point<Clock, Duration>& timePoint,
- CQSide side = CQSide::Back)
- {
- return waitAndEmplace(side, timePoint, std::move(value));
- }
- /*!
- * \brief waitAndEmplace Blocks the calling thread until there is space
- * for new element or until provided time point is passed and constructs
- * new value into the queue.
- * \param side On which side of the queue to emplace.
- * \param timePoint The max point in time after which the calling thread
- * will be unblocked.
- * \param args Arguments for the new value constructor.
- * \return True if the push is successful or false if the wait is
- * interupted.
- */
- template <class Clock, class Duration, class... Args>
- bool waitAndEmplace(CQSide side,
- const std::chrono::time_point<Clock, Duration>& timePoint,
- Args&&... args)
- {
- auto lambda = [this, &timePoint](UniqueLock& lock, Predicate pred) {
- return _condition.wait_until(lock, timePoint, pred);
- };
- return pushImpl(side, lambda, std::forward<Args>(args)...);
- }
- public: // pop operations
- /*!
- * \brief tryPop Tries to pop data from the queue.
- * \param value Reference to the value where the popped from the queue
- * data will be saved.
- * \param side From which side of the queue to pop.
- * \return Returns true on success or false when the queue is empty or
- * shutdown.
- */
- bool tryPop(Value& value, CQSide side = CQSide::Front)
- {
- return popImpl(side, value, [this](UniqueLock&, Predicate) {
- return !_queue.empty();
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available and
- * pops the value from the queue when it is pushed by another thread.
- * \param value Reference to the variable where the popped from the queue
- * value will be saved.
- * \param side From which side of the queue to pop.
- * \return True if the pop is successful or false in the case when wait
- * is interrupted.
- */
- bool waitAndPop(Value& value, CQSide side = CQSide::Front)
- {
- return popImpl(side, value, [this](UniqueLock& lock, Predicate pred) {
- this->_condition.wait(lock, pred);
- return true;
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available or
- * until provided time interval expires and pops the value from the queue.
- * \param value Reference to the variable where the popped from the queue
- * value will be saved.
- * \param duration Max time interval after which the calling thread will
- * be unblocked.
- * \param side From which side of the queue to pop.
- * \return True if the pop is successful or false in the case when wait
- * is interrupted.
- */
- template<class Rep, class Period>
- bool waitAndPop(Value& value,
- const std::chrono::duration<Rep, Period>& duration,
- CQSide side = CQSide::Front)
- {
- return popImpl(side, value, [this, &duration](UniqueLock& lock, Predicate pred) {
- return _condition.wait_for(lock, duration, pred);
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available or
- * until the provided time point is passed and pops the value from the
- * queue.
- * \param value Reference to the variable where the popped from the queue
- * value will be saved.
- * \param timePoint The max point in time after which the calling thread
- * will be unblocked.
- * \param side From which side of the queue to pop.
- * \return True if the pop is successful or false in the case when wait
- * is interrupted.
- */
- template<class Clock, class Duration>
- bool waitAndPop(Value& value,
- const std::chrono::time_point<Clock, Duration>& timePoint,
- CQSide side = CQSide::Front)
- {
- return popImpl(side, value, [this, &timePoint](UniqueLock& lock, Predicate pred) {
- return _condition.wait_until(lock, timePoint, pred);
- });
- }
- /*!
- * \brief tryPop Tries to pop data from the queue.
- * \param side From which side of the queue to pop.
- * \return boost::optional with the popped value or with boost::none if
- * the queue is empty or shutdown.
- */
- boost::optional<Value> tryPop(CQSide side = CQSide::Front)
- {
- return popImpl(side, [this](UniqueLock&, Predicate) {
- return !_queue.empty();
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available and
- * pops the value from the queue.
- * \param side From which side of the queue to pop.
- * \return boost::optional with the popped value or with boost::none if
- * the wait is interupted.
- */
- boost::optional<Value> waitAndPop(CQSide side = CQSide::Front)
- {
- return popImpl(side, [this](UniqueLock& lock, Predicate pred) {
- this->_condition.wait(lock, pred);
- return true;
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available or
- * until provided time interval expires and pops the value from the queue.
- * \param duration Max time interval after which the calling thread will
- * be unblocked.
- * \param side From which side of the queue to pop.
- * \return boost::optional with the popped value or with boost::none if
- * the wait is interupted.
- */
- template<class Rep, class Period>
- boost::optional<Value> waitAndPop(const std::chrono::duration<Rep, Period>& duration,
- CQSide side = CQSide::Front)
- {
- return popImpl(side, [this, &duration](UniqueLock& lock, Predicate pred) {
- return _condition.wait_for(lock, duration, pred);
- });
- }
- /*!
- * \brief waitAndPop Blocks the calling thread until data is available or
- * until provided time point is passed and pops the value from the queue.
- * \param timePoint The max point in time after which the calling thread
- * will be unblocked.
- * \param side From which side of the queue to pop.
- * \return boost::optional with the popped value or with boost::none if
- * the wait is interupted.
- */
- template<class Clock, class Duration>
- boost::optional<Value> waitAndPop(const std::chrono::time_point<Clock, Duration>& timePoint,
- CQSide side = CQSide::Front)
- {
- return popImpl(side, [this, &timePoint](UniqueLock& lock, Predicate pred) {
- return _condition.wait_until(lock, timePoint, pred);
- });
- }
- public: // queue management
- /*!
- * \brief shutdown Notifies the waiting on the queue threads to unblock
- * and forbids future pushing and popping operations.
- */
- void shutdown()
- {
- UniqueLock lock(_mutex);
- _isPoppingStopped = true;
- _isPushingStopped = true;
- lock.unlock();
- _condition.notify_all();
- }
- /*!
- * \brief stopPushing Notifies the waiting on the queue pushing threads to
- * unblock and forbids future pushing operations.
- */
- void stopPushing()
- {
- UniqueLock lock(_mutex);
- _isPushingStopped = true;
- lock.unlock();
- _condition.notify_all();
- }
- /*!
- * \brief stopPopping Notifies the waiting on the queue popping threads to
- * unblcock and forbids future popping operations.
- */
- void stopPopping()
- {
- UniqueLock lock(_mutex);
- _isPoppingStopped = true;
- lock.unlock();
- _condition.notify_all();
- }
- /*!
- * \brief restart Restarts the previous shutdown queue in the case we want
- * to reuse it.
- */
- void restart()
- {
- LockGuard lock(_mutex);
- _isPoppingStopped = false;
- _isPushingStopped = false;
- }
- /*!
- * \brief restartPushing Allows again previously disallowed pushing
- * operations.
- */
- void restartPushing()
- {
- LockGuard lock(_mutex);
- _isPushingStopped = false;
- }
- /*!
- * \brief restartPopping Allows again previously disallowed popping
- * operations.
- */
- void restartPopping()
- {
- LockGuard lock(_mutex);
- _isPoppingStopped = false;
- }
- /*!
- * \brief isShutdown Checks whether the queue is shutdown.
- * \return True if the queue is shutdown and false otherwise.
- */
- bool isShutdown() const
- {
- LockGuard lock(_mutex);
- return _isPoppingStopped && _isPushingStopped;
- }
- /*!
- * \brief isPushingStopped Checks whether pushing to the queue is allowed.
- * \return True if the pushing is allowed and false otherwise.
- */
- bool isPushingStopped() const
- {
- LockGuard lock(_mutex);
- return _isPushingStopped;
- }
- /*!
- * \brief isPoppingStopped Checks whether popping from the queue is
- * allowed.
- * \return True if the popping is allowed and false otherwise.
- */
- bool isPoppingStopped() const
- {
- LockGuard lock(_mutex);
- return _isPoppingStopped;
- }
- /*!
- * \brief size Gets the queue size.
- * \return The number of the elements in the queue.
- * \note This method is not completely thread safe because the size of the
- * queue can change immediately after return. Use it only for approximate
- * estimation.
- */
- size_type size() const
- {
- LockGuard lock(_mutex);
- return _queue.size();
- }
- /*!
- * \brief empty Checks whether the queue is empty.
- * \return true if the queue is empty and false otherwise.
- * \note This method is not thread safe, because the size of the queue can
- * chenge immendiately after return.
- */
- bool empty() const
- {
- LockGuard lock(_mutex);
- return _queue.empty();
- }
- private: // types
- typedef std::unique_lock<std::mutex> UniqueLock;
- typedef std::lock_guard<std::mutex> LockGuard;
- typedef std::function<bool ()> Predicate;
- private: // methods
- template <class Predicate, class... Args>
- bool pushImpl(CQSide side, Predicate pred, Args&&... args)
- {
- UniqueLock lock(_mutex);
- assert(_queue.size() <= _sizeLimit);
- if(!pred(lock, [this]{ return _queue.size() < _sizeLimit || _isPushingStopped; }))
- return false;
- if(_isPushingStopped)
- return false;
- if(CQSide::Front == side)
- _queue.emplace_front(std::forward<Args>(args)...);
- else
- _queue.emplace_back(std::forward<Args>(args)...);
- lock.unlock();
- _condition.notify_one();
- return true;
- }
- template <class Predicate>
- bool popImpl(CQSide side, Value& value, Predicate pred)
- {
- try
- {
- UniqueLock lock(_mutex);
- assert(_queue.size() <= _sizeLimit);
- if(!pred(lock, [this]{ return !_queue.empty() || _isPoppingStopped; }))
- return false;
- if(_isPoppingStopped)
- return false;
- if(CQSide::Front == side)
- {
- value = std::move(_queue.front());
- _queue.pop_front();
- }
- else
- {
- value = std::move(_queue.back());
- _queue.pop_back();
- }
- return true;
- }
- catch(...)
- {
- // if the exception is thrown notify another thread to pop the value
- _condition.notify_one();
- throw;
- }
- }
- template <class Predicate>
- boost::optional<Value> popImpl(CQSide side, Predicate pred)
- {
- try
- {
- UniqueLock lock(_mutex);
- assert(_queue.size() <= _sizeLimit);
- if(!pred(lock, [this]{ return !_queue.empty() || _isPoppingStopped; }))
- return boost::none;
- if(_isPoppingStopped)
- return boost::none;
- boost::optional<Value> value;
- if(CQSide::Front == side)
- {
- value = std::move(_queue.front());
- _queue.pop_front();
- }
- else
- {
- value = std::move(_queue.back());
- _queue.pop_back();
- }
- return value;
- }
- catch(...)
- {
- // if the exception is thrown notify another thread to pop the value
- _condition.notify_one();
- throw;
- }
- }
- static size_type getLimit(size_type limit)
- {
- return 0 == limit ? std::numeric_limits<size_type>::max() : limit;
- }
- private: // data
- container_type _queue;
- mutable std::mutex _mutex;
- std::condition_variable _condition;
- size_t _sizeLimit;
- bool _isPoppingStopped;
- bool _isPushingStopped;
- }; // ConcurrentQueue class
- } // utils namespace
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement