Advertisement
Guest User

Untitled

a guest
Jul 27th, 2015
457
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 24.52 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include <deque>
  4. #include <mutex>
  5. #include <condition_variable>
  6. #include <cassert>
  7. #include <boost/optional.hpp>
  8.  
  9. #include "helpermacros.h"
  10.  
  11. namespace utils
  12. {
  13.  
  14. enum class CQSide { Front, Back };
  15.  
  16. /*!
  17.  * This class template provides thread safe blocking queue on the top of the
  18.  * standard STL queue.
  19.  */
  20. template <class Value,
  21.           template <class, class> class Sequence = std::deque,
  22.           template <class> class Allocator = std::allocator>
  23. class ConcurrentQueue
  24. {
  25. public: // types
  26.  
  27.     NON_COPYABLE_NOR_MOVABLE(ConcurrentQueue)
  28.  
  29.     typedef Allocator<Value>                            allocator_type;
  30.     typedef Sequence<Value, allocator_type>             container_type;
  31.     typedef typename container_type::value_type         value_type;
  32.     typedef typename container_type::reference          reference;
  33.     typedef typename container_type::const_reference    const_reference;
  34.     typedef typename container_type::size_type          size_type;
  35.  
  36. public: // constructors
  37.  
  38.     /*!
  39.      * \brief Contructs new empty ConcurrentQueue object.
  40.      * \param sizeLimit The maximum number of elements into the queue. If zero
  41.      * then the queue is unlimited.
  42.      */
  43.     explicit ConcurrentQueue(size_type sizeLimit = 0)
  44.         : _sizeLimit(getLimit(sizeLimit))
  45.         , _isPoppingStopped(false)
  46.         , _isPushingStopped(false) {}
  47.  
  48.     /*!
  49.      * \brief Constructs new ConcurrentQueue object copying the data from the
  50.      * container into it.
  51.      * \param container Container to copy.
  52.      * \param sizeLimit The maximum number of elements into the queue. If zero
  53.      * then the queue is unlimited.
  54.      */
  55.     explicit ConcurrentQueue(const container_type& container, size_type sizeLimit = 0)
  56.         : _queue(container)
  57.         , _sizeLimit(getLimit(sizeLimit))
  58.         , _isPoppingStopped(false)
  59.         , _isPushingStopped(false)
  60.     {
  61.         assert(_queue.size() <= _sizeLimit);
  62.     }
  63.  
  64.     /*!
  65.      * \brief Constructs new ConcurrentQueue object moving the data from the
  66.      * container into it.
  67.      * \param container Container to move.
  68.      * \param sizeLimit The maximum number of elements into the queue. If zero
  69.      * then the queue is unlimited.
  70.      */
  71.     explicit ConcurrentQueue(container_type&& container, size_type sizeLimit = 0)
  72.         : _queue(std::move(container))
  73.         , _sizeLimit(getLimit(sizeLimit))
  74.         , _isPoppingStopped(false)
  75.         , _isPushingStopped(false)
  76.     {
  77.         assert(_queue.size() < _sizeLimit);
  78.     }
  79.  
  80.     /*!
  81.      * \brief Constructs new ConcurrentQueue object with user supplied
  82.      * allocator.
  83.      * \param allocator User supplied allocator.
  84.      * \param sizeLimit The maximum number of elements into the queue. If zero
  85.      * then the queue is unlimited.
  86.      */
  87.     explicit ConcurrentQueue(const allocator_type& allocator, size_type sizeLimit = 0)
  88.         : _queue(allocator)
  89.         , _sizeLimit(getLimit(sizeLimit))
  90.         , _isPoppingStopped(false)
  91.         , _isPushingStopped(false)
  92.     {
  93.     }
  94.  
  95.     /*!
  96.      * \brief Constructs new unlimited ConcurrentQueue object copying the data
  97.      * from the container into it and providing a user supplied allocator.
  98.      * \param container Container to copy.
  99.      * \param allocator User supplied allocator.
  100.      * \param sizeLimit The maximum number of elements into the queue. If zero
  101.      * then the queue is unlimited.
  102.      */
  103.     ConcurrentQueue(const container_type& container,
  104.                     const allocator_type& allocator,
  105.                     size_type sizeLimit = 0)
  106.         : _queue(container, allocator)
  107.         , _sizeLimit(getLimit(sizeLimit))
  108.         , _isPoppingStopped(false)
  109.         , _isPushingStopped(false)
  110.     {
  111.         assert(_queue.size() < _sizeLimit);
  112.     }
  113.  
  114.     /*!
  115.      * \brief Constructs new ConcurrentQueue object moving the data from the
  116.      * container into it and providing a user supplied allocator.
  117.      * \param container Container to move.
  118.      * \param allocator User supplied allocator.
  119.      * \param sizeLimit The maximum number of elements into the queue. If zero
  120.      * then the queue is unlimited.
  121.      */
  122.     ConcurrentQueue(container_type&& container,
  123.                     const allocator_type& allocator,
  124.                     size_type sizeLimit = 0)
  125.         : _queue(std::move(container), allocator)
  126.         , _sizeLimit(getLimit(sizeLimit))
  127.         , _isPoppingStopped(false)
  128.         , _isPushingStopped(false)
  129.     {
  130.         assert(_queue.size() < _sizeLimit);
  131.     }
  132.  
  133. public: // insert operations
  134.  
  135.     /*!
  136.      * \brief tryPush Pushes value into the queue if the queue limit is not
  137.      * reached.
  138.      * \param value The value to be pushed.
  139.      * \param side On which side of the queue to push.
  140.      * \return Returns true on success or false when the queue limit is
  141.      * reached or the queue is shutdown.
  142.      */
  143.     bool tryPush(const Value& value, CQSide side = CQSide::Back)
  144.     {
  145.         return tryEmplace(side, value);
  146.     }
  147.  
  148.     /*!
  149.      * \brief tryPush Moves value into the queue if the queue limit is not
  150.      * reached.
  151.      * \param value The value to be moved.
  152.      * \param side On which side of the queue to move.
  153.      * \return Returns true on success or false when the queue limit is
  154.      * reached or the queue is shutdown.
  155.      */
  156.     bool tryPush(Value&& value, CQSide side = CQSide::Back)
  157.     {
  158.         return tryEmplace(side, std::move(value));
  159.     }
  160.  
  161.     /*!
  162.      * \brief tryEmplace Constructs new value direclty into the queue if the
  163.      * queue limit is not reached.
  164.      * \param side On which side of the queue to emplace.
  165.      * \param args Arguments for the new value constructor.
  166.      * \return Returns true on success or false when the queue limit is
  167.      * reached or the queue is shutdown.
  168.      */
  169.     template<class... Args>
  170.     bool tryEmplace(CQSide side, Args&&... args)
  171.     {
  172.         auto lambda = [this](UniqueLock&, Predicate) {
  173.             return _queue.size() < _sizeLimit;
  174.         };
  175.         return pushImpl(side, lambda, std::forward<Args>(args)...);
  176.     }
  177.  
  178.     /*!
  179.      * \brief waitAndPush Blocks the calling thread until there is space for
  180.      * new element and pushes value into the queue.
  181.      * \param value The value to be pushed.
  182.      * \param side On which side of the queue to push.
  183.      * \return Returns true on success or false when the queue is shutdown.
  184.      */
  185.     bool waitAndPush(const Value& value, CQSide side = CQSide::Back)
  186.     {
  187.         return waitAndEmplace(side, value);
  188.     }
  189.  
  190.     /*!
  191.      * \brief waitAndPush Blocks the calling thread until there is space for
  192.      * new element and moves value into the queue.
  193.      * \param value The value to be moved.
  194.      * \param side On which side of the queue to move.
  195.      * \return Returns true on success or false when the queue is shutdown.
  196.      */
  197.     bool waitAndPush(Value&& value, CQSide side = CQSide::Back)
  198.     {
  199.         return waitAndEmplace(side, std::move(value));
  200.     }
  201.  
  202.     /*!
  203.      * \brief waitAndEmplace Blocks the calling thread until there is space for
  204.      * new element and constructs new value into the queue.
  205.      * \param side On which side of the queue to emplace.
  206.      * \param args Arguments for the new value constructor.
  207.      * \return Returns true on success or false when the queue is shutdown.
  208.      */
  209.     template <class... Args>
  210.     bool waitAndEmplace(CQSide side, Args&&... args)
  211.     {
  212.         auto lambda = [this](UniqueLock& lock, Predicate pred) {
  213.             this->_condition.wait(lock, pred);
  214.             return true;
  215.         };
  216.         return pushImpl(side, lambda, std::forward<Args>(args)...);
  217.     }
  218.  
  219.     /*!
  220.      * \brief waitAndPush Blocks the calling thread until there is space for
  221.      * new element or until provided time interval expires and pushes new
  222.      * value into the queue.
  223.      * \param value The value to be pushed.
  224.      * \param duration Max time interval after which the calling thread will
  225.      * be unblocked.
  226.      * \param side On which side of the queue to push.
  227.      * \return True if the push is successful or false if the wait is
  228.      * interupted.
  229.      */
  230.     template <class Rep, class Period>
  231.     bool waitAndPush(const Value& value,
  232.                      const std::chrono::duration<Rep, Period>& duration,
  233.                      CQSide side = CQSide::Back)
  234.     {
  235.         return waitAndEmplace(side, duration, value);
  236.     }
  237.  
  238.     /*!
  239.      * \brief waitAndPush Blocks the calling thread until there is space for
  240.      * new element or until provided time interval expires and moves new
  241.      * value into the queue.
  242.      * \param value The value to be moved.
  243.      * \param duration Max time interval after which the calling thread will
  244.      * be unblocked.
  245.      * \param side On which side of the queue to move.
  246.      * \return True if the push is successful or false if the wait is
  247.      * interupted.
  248.      */
  249.     template <class Rep, class Period>
  250.     bool waitAndPush(Value&& value,
  251.                      const std::chrono::duration<Rep, Period>& duration,
  252.                      CQSide side = CQSide::Back)
  253.     {
  254.         return waitAndEmplace(side, duration, std::move(value));
  255.     }
  256.  
  257.     /*!
  258.      * \brief waitAndEmplace Blocks the calling thread until there is space
  259.      * for new element or until provided time interval expires and constructs
  260.      * new value inot the queue.
  261.      * \param side On which side of the queue to emplace.
  262.      * \param duration Max time interval after which the calling thread will
  263.      * be unblocked.
  264.      * \param args Arguments for the new value constructor.
  265.      * \return True if the push is successful or false if the wait is
  266.      * interupted.
  267.      */
  268.     template <class Rep, class Period, class... Args>
  269.     bool waitAndEmplace(CQSide side,
  270.                         const std::chrono::duration<Rep, Period>& duration,
  271.                         Args&&... args)
  272.     {
  273.         auto lambda = [this, &duration](UniqueLock& lock, Predicate pred) {
  274.             return _condition.wait_for(lock, duration, pred);
  275.         };
  276.         return pushImpl(side, lambda, std::forward<Args>(args)...);
  277.     }
  278.  
  279.     /*!
  280.      * \brief waitAndPush Blocks the calling thread until there is space for
  281.      * new element or until provided time point is passed and pushes new value
  282.      * into the queue.
  283.      * \param value The value to be pushed.
  284.      * \param timePoint The max point in time after which the calling thread
  285.      * will be unblocked.
  286.      * \param side On which side of the queue to push.
  287.      * \return True if the push is successful or false if the wait is
  288.      * interupted.
  289.      */
  290.     template <class Clock, class Duration>
  291.     bool waitAndPush(const Value& value,
  292.                      const std::chrono::time_point<Clock, Duration>& timePoint,
  293.                      CQSide side = CQSide::Back)
  294.     {
  295.         return waitAndEmplace(side, timePoint, value);
  296.     }
  297.  
  298.     /*!
  299.      * \brief waitAndPush Blocks the calling thread until there is space for
  300.      * new element or until provided time point is passed and moves new value
  301.      * into the queue.
  302.      * \param value The value to be moved.
  303.      * \param timePoint The max point in time after which the calling thread
  304.      * will be unblocked.
  305.      * \param side On which side of the queue to push.
  306.      * \return True if the push is successful or false if the wait is
  307.      * interupted.
  308.      */
  309.     template <class Clock, class Duration>
  310.     bool waitAndPush(Value&& value,
  311.                      const std::chrono::time_point<Clock, Duration>& timePoint,
  312.                      CQSide side = CQSide::Back)
  313.     {
  314.         return waitAndEmplace(side, timePoint, std::move(value));
  315.     }
  316.  
  317.     /*!
  318.      * \brief waitAndEmplace Blocks the calling thread until there is space
  319.      * for new element or until provided time point is passed and constructs
  320.      * new value into the queue.
  321.      * \param side On which side of the queue to emplace.
  322.      * \param timePoint The max point in time after which the calling thread
  323.      * will be unblocked.
  324.      * \param args Arguments for the new value constructor.
  325.      * \return True if the push is successful or false if the wait is
  326.      * interupted.
  327.      */
  328.     template <class Clock, class Duration, class... Args>
  329.     bool waitAndEmplace(CQSide side,
  330.                         const std::chrono::time_point<Clock, Duration>& timePoint,
  331.                         Args&&... args)
  332.     {
  333.         auto lambda = [this, &timePoint](UniqueLock& lock, Predicate pred) {
  334.             return _condition.wait_until(lock, timePoint, pred);
  335.         };
  336.         return pushImpl(side, lambda, std::forward<Args>(args)...);
  337.     }
  338.  
  339. public: // pop operations
  340.  
  341.     /*!
  342.      * \brief tryPop Tries to pop data from the queue.
  343.      * \param value Reference to the value where the popped from the queue
  344.      * data will be saved.
  345.      * \param side From which side of the queue to pop.
  346.      * \return Returns true on success or false when the queue is empty or
  347.      * shutdown.
  348.      */
  349.     bool tryPop(Value& value, CQSide side = CQSide::Front)
  350.     {
  351.         return popImpl(side, value, [this](UniqueLock&, Predicate) {
  352.             return !_queue.empty();
  353.         });
  354.     }
  355.  
  356.     /*!
  357.      * \brief waitAndPop Blocks the calling thread until data is available and
  358.      * pops the value from the queue when it is pushed by another thread.
  359.      * \param value Reference to the variable where the popped from the queue
  360.      * value will be saved.
  361.      * \param side From which side of the queue to pop.
  362.      * \return True if the pop is successful or false in the case when wait
  363.      * is interrupted.
  364.      */
  365.     bool waitAndPop(Value& value, CQSide side = CQSide::Front)
  366.     {
  367.         return popImpl(side, value, [this](UniqueLock& lock, Predicate pred) {
  368.             this->_condition.wait(lock, pred);
  369.             return true;
  370.         });
  371.     }
  372.  
  373.     /*!
  374.      * \brief waitAndPop Blocks the calling thread until data is available or
  375.      * until provided time interval expires and pops the value from the queue.
  376.      * \param value Reference to the variable where the popped from the queue
  377.      * value will be saved.
  378.      * \param duration Max time interval after which the calling thread will
  379.      * be unblocked.
  380.      * \param side From which side of the queue to pop.
  381.      * \return True if the pop is successful or false in the case when wait
  382.      * is interrupted.
  383.      */
  384.     template<class Rep, class Period>
  385.     bool waitAndPop(Value& value,
  386.                     const std::chrono::duration<Rep, Period>& duration,
  387.                     CQSide side = CQSide::Front)
  388.     {
  389.         return popImpl(side, value, [this, &duration](UniqueLock& lock, Predicate pred) {
  390.             return _condition.wait_for(lock, duration, pred);
  391.         });
  392.     }
  393.  
  394.     /*!
  395.      * \brief waitAndPop Blocks the calling thread until data is available or
  396.      * until the provided time point is passed and pops the value from the
  397.      * queue.
  398.      * \param value Reference to the variable where the popped from the queue
  399.      * value will be saved.
  400.      * \param timePoint The max point in time after which the calling thread
  401.      * will be unblocked.
  402.      * \param side From which side of the queue to pop.
  403.      * \return True if the pop is successful or false in the case when wait
  404.      * is interrupted.
  405.      */
  406.     template<class Clock, class Duration>
  407.     bool waitAndPop(Value& value,
  408.                     const std::chrono::time_point<Clock, Duration>& timePoint,
  409.                     CQSide side = CQSide::Front)
  410.     {
  411.         return popImpl(side, value, [this, &timePoint](UniqueLock& lock, Predicate pred) {
  412.             return _condition.wait_until(lock, timePoint, pred);
  413.         });
  414.     }
  415.  
  416.     /*!
  417.      * \brief tryPop Tries to pop data from the queue.
  418.      * \param side From which side of the queue to pop.
  419.      * \return boost::optional with the popped value or with boost::none if
  420.      * the queue is empty or shutdown.
  421.      */
  422.     boost::optional<Value> tryPop(CQSide side = CQSide::Front)
  423.     {
  424.         return popImpl(side, [this](UniqueLock&, Predicate) {
  425.             return !_queue.empty();
  426.         });
  427.     }
  428.  
  429.     /*!
  430.      * \brief waitAndPop Blocks the calling thread until data is available and
  431.      * pops the value from the queue.
  432.      * \param side From which side of the queue to pop.
  433.      * \return boost::optional with the popped value or with boost::none if
  434.      * the wait is interupted.
  435.      */
  436.     boost::optional<Value> waitAndPop(CQSide side = CQSide::Front)
  437.     {
  438.         return popImpl(side, [this](UniqueLock& lock, Predicate pred) {
  439.             this->_condition.wait(lock, pred);
  440.             return true;
  441.         });
  442.     }
  443.  
  444.     /*!
  445.      * \brief waitAndPop Blocks the calling thread until data is available or
  446.      * until provided time interval expires and pops the value from the queue.
  447.      * \param duration Max time interval after which the calling thread will
  448.      * be unblocked.
  449.      * \param side From which side of the queue to pop.
  450.      * \return boost::optional with the popped value or with boost::none if
  451.      * the wait is interupted.
  452.      */
  453.     template<class Rep, class Period>
  454.     boost::optional<Value> waitAndPop(const std::chrono::duration<Rep, Period>& duration,
  455.                                       CQSide side = CQSide::Front)
  456.     {
  457.         return popImpl(side, [this, &duration](UniqueLock& lock, Predicate pred) {
  458.             return _condition.wait_for(lock, duration, pred);
  459.         });
  460.     }
  461.  
  462.     /*!
  463.      * \brief waitAndPop Blocks the calling thread until data is available or
  464.      * until provided time point is passed and pops the value from the queue.
  465.      * \param timePoint The max point in time after which the calling thread
  466.      * will be unblocked.
  467.      * \param side From which side of the queue to pop.
  468.      * \return boost::optional with the popped value or with boost::none if
  469.      * the wait is interupted.
  470.      */
  471.     template<class Clock, class Duration>
  472.     boost::optional<Value> waitAndPop(const std::chrono::time_point<Clock, Duration>& timePoint,
  473.                                       CQSide side = CQSide::Front)
  474.     {
  475.         return popImpl(side, [this, &timePoint](UniqueLock& lock, Predicate pred) {
  476.             return _condition.wait_until(lock, timePoint, pred);
  477.         });
  478.     }
  479.  
  480. public: // queue management
  481.  
  482.     /*!
  483.      * \brief shutdown Notifies the waiting on the queue threads to unblock
  484.      * and forbids future pushing and popping operations.
  485.      */
  486.     void shutdown()
  487.     {
  488.         UniqueLock lock(_mutex);
  489.         _isPoppingStopped = true;
  490.         _isPushingStopped = true;
  491.         lock.unlock();
  492.         _condition.notify_all();
  493.     }
  494.  
  495.     /*!
  496.      * \brief stopPushing Notifies the waiting on the queue pushing threads to
  497.      * unblock and forbids future pushing operations.
  498.      */
  499.     void stopPushing()
  500.     {
  501.          UniqueLock lock(_mutex);
  502.          _isPushingStopped = true;
  503.          lock.unlock();
  504.          _condition.notify_all();
  505.     }
  506.  
  507.     /*!
  508.      * \brief stopPopping Notifies the waiting on the queue popping threads to
  509.      * unblcock and forbids future popping operations.
  510.      */
  511.     void stopPopping()
  512.     {
  513.         UniqueLock lock(_mutex);
  514.         _isPoppingStopped = true;
  515.         lock.unlock();
  516.         _condition.notify_all();
  517.     }
  518.  
  519.     /*!
  520.      * \brief restart Restarts the previous shutdown queue in the case we want
  521.      * to reuse it.
  522.      */
  523.     void restart()
  524.     {
  525.         LockGuard lock(_mutex);
  526.         _isPoppingStopped = false;
  527.         _isPushingStopped = false;
  528.     }
  529.  
  530.     /*!
  531.      * \brief restartPushing Allows again previously disallowed pushing
  532.      * operations.
  533.      */
  534.     void restartPushing()
  535.     {
  536.         LockGuard lock(_mutex);
  537.         _isPushingStopped = false;
  538.     }
  539.  
  540.     /*!
  541.      * \brief restartPopping Allows again previously disallowed popping
  542.      * operations.
  543.      */
  544.     void restartPopping()
  545.     {
  546.         LockGuard lock(_mutex);
  547.         _isPoppingStopped = false;
  548.     }
  549.  
  550.     /*!
  551.      * \brief isShutdown Checks whether the queue is shutdown.
  552.      * \return True if the queue is shutdown and false otherwise.
  553.      */
  554.     bool isShutdown() const
  555.     {
  556.         LockGuard lock(_mutex);
  557.         return _isPoppingStopped && _isPushingStopped;
  558.     }
  559.  
  560.     /*!
  561.      * \brief isPushingStopped Checks whether pushing to the queue is allowed.
  562.      * \return True if the pushing is allowed and false otherwise.
  563.      */
  564.     bool isPushingStopped() const
  565.     {
  566.         LockGuard lock(_mutex);
  567.         return _isPushingStopped;
  568.     }
  569.  
  570.     /*!
  571.      * \brief isPoppingStopped Checks whether popping from the queue is
  572.      * allowed.
  573.      * \return True if the popping is allowed and false otherwise.
  574.      */
  575.     bool isPoppingStopped() const
  576.     {
  577.         LockGuard lock(_mutex);
  578.         return _isPoppingStopped;
  579.     }
  580.  
  581.     /*!
  582.      * \brief size Gets the queue size.
  583.      * \return The number of the elements in the queue.
  584.      * \note This method is not completely thread safe because the size of the
  585.      * queue can change immediately after return. Use it only for approximate
  586.      * estimation.
  587.      */
  588.     size_type size() const
  589.     {
  590.         LockGuard lock(_mutex);
  591.         return _queue.size();
  592.     }
  593.  
  594.     /*!
  595.      * \brief empty Checks whether the queue is empty.
  596.      * \return true if the queue is empty and false otherwise.
  597.      * \note This method is not thread safe, because the size of the queue can
  598.      * chenge immendiately after return.
  599.      */
  600.     bool empty() const
  601.     {
  602.         LockGuard lock(_mutex);
  603.         return _queue.empty();
  604.     }
  605.  
  606. private: // types
  607.  
  608.     typedef std::unique_lock<std::mutex> UniqueLock;
  609.     typedef std::lock_guard<std::mutex>  LockGuard;
  610.     typedef std::function<bool ()>       Predicate;
  611.  
  612. private: // methods
  613.  
  614.     template <class Predicate, class... Args>
  615.     bool pushImpl(CQSide side, Predicate pred, Args&&... args)
  616.     {
  617.         UniqueLock lock(_mutex);
  618.         assert(_queue.size() <= _sizeLimit);
  619.         if(!pred(lock, [this]{ return _queue.size() < _sizeLimit || _isPushingStopped; }))
  620.             return false;
  621.         if(_isPushingStopped)
  622.             return false;
  623.         if(CQSide::Front == side)
  624.             _queue.emplace_front(std::forward<Args>(args)...);
  625.         else
  626.             _queue.emplace_back(std::forward<Args>(args)...);
  627.         lock.unlock();
  628.         _condition.notify_one();
  629.         return true;
  630.     }
  631.  
  632.     template <class Predicate>
  633.     bool popImpl(CQSide side, Value& value, Predicate pred)
  634.     {
  635.         try
  636.         {
  637.             UniqueLock lock(_mutex);
  638.             assert(_queue.size() <= _sizeLimit);
  639.             if(!pred(lock, [this]{ return !_queue.empty() || _isPoppingStopped; }))
  640.                 return false;
  641.             if(_isPoppingStopped)
  642.                 return false;
  643.             if(CQSide::Front == side)
  644.             {
  645.                 value = std::move(_queue.front());
  646.                 _queue.pop_front();
  647.             }
  648.             else
  649.             {
  650.                 value = std::move(_queue.back());
  651.                 _queue.pop_back();
  652.             }
  653.             return true;
  654.         }
  655.         catch(...)
  656.         {
  657.             // if the exception is thrown notify another thread to pop the value
  658.             _condition.notify_one();
  659.             throw;
  660.         }
  661.     }
  662.  
  663.     template <class Predicate>
  664.     boost::optional<Value> popImpl(CQSide side, Predicate pred)
  665.     {
  666.         try
  667.         {
  668.             UniqueLock lock(_mutex);
  669.             assert(_queue.size() <= _sizeLimit);
  670.             if(!pred(lock, [this]{ return !_queue.empty() || _isPoppingStopped; }))
  671.                 return boost::none;
  672.             if(_isPoppingStopped)
  673.                 return boost::none;
  674.             boost::optional<Value> value;
  675.             if(CQSide::Front == side)
  676.             {
  677.                 value = std::move(_queue.front());
  678.                 _queue.pop_front();
  679.             }
  680.             else
  681.             {
  682.                 value = std::move(_queue.back());
  683.                 _queue.pop_back();
  684.             }
  685.             return value;
  686.         }
  687.         catch(...)
  688.         {
  689.             // if the exception is thrown notify another thread to pop the value
  690.             _condition.notify_one();
  691.             throw;
  692.         }
  693.     }
  694.  
  695.     static size_type getLimit(size_type limit)
  696.     {
  697.         return 0 == limit ? std::numeric_limits<size_type>::max() : limit;
  698.     }
  699.  
  700. private: // data
  701.  
  702.     container_type _queue;
  703.     mutable std::mutex _mutex;
  704.     std::condition_variable _condition;
  705.     size_t _sizeLimit;
  706.     bool _isPoppingStopped;
  707.     bool _isPushingStopped;
  708.  
  709. }; // ConcurrentQueue class
  710.  
  711. } // utils namespace
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement