Advertisement
Guest User

Untitled

a guest
Apr 24th, 2019
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.47 KB | None | 0 0
  1. #include <cassert>
  2. #include <condition_variable>
  3. #include <deque>
  4. #include <iostream>
  5. #include <functional>
  6. #include <memory>
  7. #include <mutex>
  8. #include <string>
  9. #include <thread>
  10. #include <utility>
  11.  
  12. namespace
  13. {
  14.  
  15. template<typename TMessage>
  16. struct mailbox_processor
  17. {
  18. using message_t = TMessage ;
  19. using enqueuer_t = std::function<void (message_t &&)> ;
  20. using processor_t = std::function<bool (enqueuer_t const &, message_t &&)>;
  21.  
  22. mailbox_processor () = delete ;
  23.  
  24. mailbox_processor (mailbox_processor const &) = delete ;
  25. mailbox_processor &operator= (mailbox_processor const &) = delete ;
  26.  
  27. mailbox_processor (mailbox_processor && ) = default ;
  28. mailbox_processor& operator= (mailbox_processor && ) = default ;
  29.  
  30. // poison_pill is used by the destructor to kill the processor
  31. // the processor must return false when it receives the poison_pill
  32. mailbox_processor (message_t poison_pill, processor_t processor)
  33. : poison_pill (std::move (poison_pill))
  34. , processor (std::move(processor) )
  35. , worker ([this] { process (); } )
  36. {
  37. }
  38.  
  39. ~mailbox_processor () noexcept
  40. {
  41. log ()
  42. << "Going down..."
  43. << std::endl
  44. ;
  45.  
  46. enqueue (poison_pill);
  47.  
  48. worker.join ();
  49. log ()
  50. << "Done"
  51. << std::endl
  52. ;
  53. }
  54.  
  55. void enqueue (message_t const & msg)
  56. {
  57. message_t copy = msg;
  58. enqueue (std::move (copy));
  59. }
  60.  
  61. void enqueue (message_t && msg)
  62. {
  63. log ()
  64. << "Enqueuing message"
  65. << std::endl
  66. ;
  67. {
  68. std::lock_guard<std::mutex> lock (mtx);
  69. messages.push_back (std::move (msg));
  70. }
  71. cv.notify_all ();
  72. }
  73.  
  74. private:
  75. message_t pop ()
  76. {
  77. std::unique_lock<std::mutex> lock(mtx);
  78. cv.wait (lock, [this] { return !messages.empty (); });
  79.  
  80. // Predicate above ensures not empty
  81. assert(!messages.empty ());
  82.  
  83. auto message = std::move (messages.front ());
  84. messages.pop_front ();
  85.  
  86. return message;
  87. }
  88.  
  89. std::ostream & log () const
  90. {
  91. return
  92. std::cout
  93. << "Message processor: "
  94. << std::hex
  95. << worker.get_id ()
  96. << " - "
  97. ;
  98. }
  99.  
  100. void process ()
  101. {
  102. try
  103. {
  104. enqueuer_t enqueuer = [this] (message_t && msg) { enqueue (std::move (msg)); };
  105. log ()
  106. << "Process loop started"
  107. << std::endl
  108. ;
  109.  
  110. bool cont = true;
  111. while (cont)
  112. {
  113. auto message = pop ();
  114.  
  115. log ()
  116. << "Message received"
  117. << std::endl
  118. ;
  119.  
  120. cont &= this->processor (enqueuer, std::move (message));
  121. }
  122.  
  123. log ()
  124. << "Process loop terminated"
  125. << std::endl
  126. ;
  127. }
  128.  
  129. catch (std::exception const & e)
  130. {
  131. log ()
  132. << "Something bad happened, message processor died: "
  133. << e.what ()
  134. << std::endl
  135. ;
  136. }
  137. catch (...)
  138. {
  139. log ()
  140. << "Something bad happened, message processor died"
  141. << std::endl
  142. ;
  143. }
  144. }
  145.  
  146. message_t poison_pill ;
  147. processor_t processor ;
  148. std::mutex mtx ;
  149. std::condition_variable cv ;
  150. std::deque<message_t> messages ;
  151. std::thread worker ;
  152. };
  153.  
  154. struct my_message
  155. {
  156. int msg_no;
  157. };
  158.  
  159. my_message my_poison_pill { -1 };
  160.  
  161. bool my_processor (std::function<void (my_message &&)> enqueuer, my_message && msg)
  162. {
  163. std::cout << "Received message: " << msg.msg_no << std::endl;
  164.  
  165. if (msg.msg_no == my_poison_pill.msg_no)
  166. {
  167. std::cout << "Poison pill received, return false" << std::endl;
  168. return false;
  169. }
  170. else if (msg.msg_no == 1)
  171. {
  172. std::cout << "Hello!" << std::endl;
  173. return true;
  174. }
  175. else if (msg.msg_no == 2)
  176. {
  177. std::cout << "There!" << std::endl;
  178. return true;
  179. }
  180. else
  181. {
  182. std::cout << "Oops!" << std::endl;
  183. return true;
  184. }
  185. }
  186. }
  187.  
  188.  
  189.  
  190. int main()
  191. {
  192. mailbox_processor<my_message> mbp (my_poison_pill, &my_processor);
  193.  
  194. mbp.enqueue (my_message {1});
  195. mbp.enqueue (my_message {2});
  196. mbp.enqueue (my_message {3});
  197.  
  198. std::cout << "Press new-line to exit" << std::endl;
  199. std::string line;
  200. std::getline(std::cin, line);
  201.  
  202. return 0;
  203. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement