homer512

SpScQueue mutex destructor

May 31st, 2025
649
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 1.60 KB | None | 0 0
  1. #include <mutex>
  2. #include <condition_variable>
  3. #include <deque>
  4.  
  5.  
  6. class SpScQueue
  7. {
  8.     std::mutex mutex;
  9.     std::deque<int> queue;
  10.     bool closed = false;
  11.     std::condition_variable cond;
  12. public:
  13.     void push(int x)
  14.     {
  15.         std::lock_guard<std::mutex> lock{mutex};
  16.         queue.push_back(x);
  17.         cond.notify_one();
  18.     }
  19.     void close()
  20.     {
  21.         std::lock_guard<std::mutex> lock{mutex};
  22.         closed = true;
  23.         cond.notify_all();
  24.         /*
  25.          * Important: The lock guard's destructor will unlock the mutex.
  26.          * As per C++ 32.6.4.2.2 Class mutex [thread.mutex.class] paragraph 2,
  27.          * is is valid for another thread (in this case the consumer) to destroy
  28.          * the mutex before the unlock() returns.
  29.          * Users have to make sure not to access any members after unlocking,
  30.          * for example by moving the notify_all() out of the lock
  31.          */
  32.     }
  33.     // return false if closed and empty
  34.     bool pop(int* out)
  35.     {
  36.         std::unique_lock<std::mutex> lock{mutex};
  37.         while(queue.empty() && ! closed)
  38.             cond.wait(lock);
  39.         if(queue.empty())
  40.             return false;
  41.         *out = queue.front();
  42.         queue.pop_front();
  43.         return true;
  44.     }
  45. };
  46.  
  47. // producer borrows queue
  48. void producer(SpScQueue* queue)
  49. {
  50.     for(int i = 0; i < 10; ++i)
  51.         queue->push(i);
  52.     queue->close();
  53. }
  54. // consumer holds ownership of queue
  55. void consumer(std::unique_ptr<SpScQueue>&& queue)
  56. {
  57.     int x;
  58.     while(queue->pop(&x)) {
  59.         // consume value x
  60.     }
  61.     queue.reset(); // delete queue
  62. }
Advertisement