Guest User

Untitled

a guest
Jan 17th, 2019
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.69 KB | None | 0 0
  1. #include <queue>
  2. #include <mutex>
  3. #include <chrono>
  4. #include <thread>
  5. #include <cstdint>
  6. #include <iostream>
  7. #include <condition_variable>
  8.  
  9. /**
  10. * throw the exception when waiting is interrupted by other threads.
  11. */
  12. class BlockInterrupt
  13. {
  14. public:
  15. BlockInterrupt()
  16. {
  17. }
  18.  
  19. };
  20.  
  21. /**
  22. * RAII, execute the function in destructor.
  23. */
  24. class Final
  25. {
  26. public:
  27. explicit Final(const std::function<void()>& final)
  28. :m_final(final)
  29. {
  30. }
  31.  
  32. ~Final()
  33. {
  34. try
  35. {
  36. m_final();
  37. }
  38. catch(...)
  39. {
  40. }
  41. }
  42.  
  43. private:
  44. const std::function<void()> m_final;
  45.  
  46. };
  47.  
  48. template<typename T>
  49. class BlockQueue
  50. {
  51. public:
  52. BlockQueue()
  53. {
  54. }
  55.  
  56. /**
  57. * queue will be destroyed, interrupt all blocking thread.
  58. * the mutex will be locked until the deconstruction is finished.
  59. */
  60. ~BlockQueue()
  61. {
  62. interrupt();
  63. std::unique_lock<std::mutex> lock(m_mutex);
  64. m_interrupted = true;
  65. }
  66.  
  67. BlockQueue(const BlockQueue&) = delete;
  68. BlockQueue(BlockQueue&&) = delete;
  69. BlockQueue& operator=(const BlockQueue&) = delete;
  70. BlockQueue& operator=(BlockQueue&&) = delete;
  71.  
  72. BlockQueue& push(const T& value)
  73. {
  74. std::unique_lock<std::mutex> lock(m_mutex);
  75. m_queue.push(value);
  76. //if some threads is wait for taking, notify one.
  77. if(m_waiter_count > 0)
  78. {
  79. m_wait_finished.notify_one();
  80. }
  81. return *this;
  82. }
  83.  
  84. BlockQueue& push(T&& value)
  85. {
  86. std::unique_lock<std::mutex> lock(m_mutex);
  87. m_queue.push(std::move(value));
  88. //if some threads is wait for taking, notify one.
  89. if(m_waiter_count > 0)
  90. {
  91. m_wait_finished.notify_one();
  92. }
  93. return *this;
  94. }
  95.  
  96. template<typename... Args>
  97. BlockQueue& emplace(Args&&... args)
  98. {
  99. std::unique_lock<std::mutex> lock(m_mutex);
  100. m_queue.emplace(std::forward<Args>(args)...);
  101. //if some threads is wait for taking, notify one.
  102. if(m_waiter_count > 0)
  103. {
  104. m_wait_finished.notify_one();
  105. }
  106. return *this;
  107. }
  108.  
  109. T take()
  110. {
  111. std::unique_lock<std::mutex> lock(m_mutex);
  112. if(m_interrupted)
  113. {
  114. throw BlockInterrupt();
  115. }
  116. if(!m_queue.empty())
  117. {
  118. T value = std::move(const_cast<T&>(m_queue.front()));
  119. m_queue.pop();
  120. return value;
  121. }
  122.  
  123. //if the count of waiter is overflow.
  124. if(m_waiter_count == std::numeric_limits<decltype(m_waiter_count)>::max())
  125. {
  126. throw std::runtime_error("The counter for waiter of block queue is overflow.");
  127. }
  128. //the call must be blocked.
  129. ++m_waiter_count;
  130. Final final([this]()
  131. {
  132. //this code will be execute anyhow.
  133. --m_waiter_count;
  134. if(m_waiter_count == 0 && m_interrupted == true)
  135. {
  136. //if count of waiter is zero, and some threads call the interrupt(), notify all.
  137. m_interrupt_finished.notify_all();
  138. }
  139. });
  140. m_wait_finished.wait(lock, [this](){return !m_queue.empty() || m_interrupted;});
  141. if(m_interrupted)
  142. {
  143. throw BlockInterrupt();
  144. }
  145. T value = std::move(const_cast<T&>(m_queue.front()));
  146. m_queue.pop();
  147. return value;
  148. }
  149.  
  150. bool try_pop(T& value)
  151. {
  152. std::unique_lock<std::mutex> lock(m_mutex);
  153. if(!m_queue.empty())
  154. {
  155. value = std::move(const_cast<T&>(m_queue.front()));
  156. m_queue.pop();
  157. return true;
  158. }
  159. return false;
  160. }
  161.  
  162. /**
  163. * interrupt all block threads, return until all threads are interrupted.
  164. */
  165. BlockQueue& interrupt() noexcept
  166. {
  167. std::unique_lock<std::mutex> lock(m_mutex);
  168. if(m_waiter_count == 0)
  169. {
  170. //no waiter, return.
  171. m_interrupted = false;
  172. return *this;
  173. }
  174.  
  175. m_interrupted = true;
  176. //wake up all block threads.
  177. m_wait_finished.notify_all();
  178. //wait for all threads are interrupted.
  179. m_interrupt_finished.wait(lock, [this](){return m_waiter_count == 0;});
  180. m_interrupted = false;
  181. return *this;
  182. }
  183.  
  184. private:
  185. /**
  186. * mutex for accessing of "m_queue", "m_interrupted", "m_waiter_count".
  187. */
  188. std::mutex m_mutex;
  189. std::queue<T> m_queue;
  190. /**
  191. * express whether the queue is interrupted.
  192. */
  193. bool m_interrupted = false;
  194. /**
  195. * count of threads blocked.
  196. */
  197. std::uint32_t m_waiter_count = std::numeric_limits<decltype(m_waiter_count)>::min();
  198. /**
  199. * condition variable for notifying that the queue is readable or interrupted (so the threads blocked should be awakened).
  200. */
  201. std::condition_variable m_wait_finished;
  202. /**
  203. * condition variable for notifying that no thread is blocking (so interrupt has finished).
  204. */
  205. std::condition_variable m_interrupt_finished;
  206.  
  207. };
  208.  
  209. int main(int argc, char const *argv[])
  210. {
  211. BlockQueue<int> q;
  212.  
  213. std::thread taker1([&]()
  214. {
  215. try
  216. {
  217. q.take();
  218. }
  219. catch(const BlockInterrupt&)
  220. {
  221. std::cout << "taker1 interrupted." << std::endl;
  222. }
  223. });
  224.  
  225. std::thread taker2([&]()
  226. {
  227. try
  228. {
  229. q.take();
  230. }
  231. catch(const BlockInterrupt&)
  232. {
  233. std::cout << "taker2 interrupted." << std::endl;
  234. }
  235. });
  236.  
  237. using namespace std::literals::chrono_literals;
  238. std::this_thread::sleep_for(0.5s);
  239.  
  240. std::thread interrupter([&]()
  241. {
  242. q.interrupt();
  243. });
  244.  
  245. taker1.join();
  246. taker2.join();
  247. interrupter.join();
  248.  
  249. return 0;
  250. }
Add Comment
Please, Sign In to add comment