Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <cassert>
- #include <condition_variable>
- #include <deque>
- #include <iostream>
- #include <functional>
- #include <memory>
- #include <mutex>
- #include <string>
- #include <thread>
- #include <utility>
- namespace
- {
- template<typename TMessage>
- struct mailbox_processor
- {
- using message_t = TMessage ;
- using enqueuer_t = std::function<void (message_t &&)> ;
- using processor_t = std::function<bool (enqueuer_t const &, message_t &&)>;
- mailbox_processor () = delete ;
- mailbox_processor (mailbox_processor const &) = delete ;
- mailbox_processor &operator= (mailbox_processor const &) = delete ;
- mailbox_processor (mailbox_processor && ) = default ;
- mailbox_processor& operator= (mailbox_processor && ) = default ;
- // poison_pill is used by the destructor to kill the processor
- // the processor must return false when it receives the poison_pill
- mailbox_processor (message_t poison_pill, processor_t processor)
- : poison_pill (std::move (poison_pill))
- , processor (std::move(processor) )
- , worker ([this] { process (); } )
- {
- }
- ~mailbox_processor () noexcept
- {
- log ()
- << "Going down..."
- << std::endl
- ;
- enqueue (poison_pill);
- worker.join ();
- log ()
- << "Done"
- << std::endl
- ;
- }
- void enqueue (message_t const & msg)
- {
- message_t copy = msg;
- enqueue (std::move (copy));
- }
- void enqueue (message_t && msg)
- {
- log ()
- << "Enqueuing message"
- << std::endl
- ;
- {
- std::lock_guard<std::mutex> lock (mtx);
- messages.push_back (std::move (msg));
- }
- cv.notify_all ();
- }
- private:
- message_t pop ()
- {
- std::unique_lock<std::mutex> lock(mtx);
- cv.wait (lock, [this] { return !messages.empty (); });
- // Predicate above ensures not empty
- assert(!messages.empty ());
- auto message = std::move (messages.front ());
- messages.pop_front ();
- return message;
- }
- std::ostream & log () const
- {
- return
- std::cout
- << "Message processor: "
- << std::hex
- << worker.get_id ()
- << " - "
- ;
- }
- void process ()
- {
- try
- {
- enqueuer_t enqueuer = [this] (message_t && msg) { enqueue (std::move (msg)); };
- log ()
- << "Process loop started"
- << std::endl
- ;
- bool cont = true;
- while (cont)
- {
- auto message = pop ();
- log ()
- << "Message received"
- << std::endl
- ;
- cont &= this->processor (enqueuer, std::move (message));
- }
- log ()
- << "Process loop terminated"
- << std::endl
- ;
- }
- catch (std::exception const & e)
- {
- log ()
- << "Something bad happened, message processor died: "
- << e.what ()
- << std::endl
- ;
- }
- catch (...)
- {
- log ()
- << "Something bad happened, message processor died"
- << std::endl
- ;
- }
- }
- message_t poison_pill ;
- processor_t processor ;
- std::mutex mtx ;
- std::condition_variable cv ;
- std::deque<message_t> messages ;
- std::thread worker ;
- };
- struct my_message
- {
- int msg_no;
- };
- my_message my_poison_pill { -1 };
- bool my_processor (std::function<void (my_message &&)> enqueuer, my_message && msg)
- {
- std::cout << "Received message: " << msg.msg_no << std::endl;
- if (msg.msg_no == my_poison_pill.msg_no)
- {
- std::cout << "Poison pill received, return false" << std::endl;
- return false;
- }
- else if (msg.msg_no == 1)
- {
- std::cout << "Hello!" << std::endl;
- return true;
- }
- else if (msg.msg_no == 2)
- {
- std::cout << "There!" << std::endl;
- return true;
- }
- else
- {
- std::cout << "Oops!" << std::endl;
- return true;
- }
- }
- }
- int main()
- {
- mailbox_processor<my_message> mbp (my_poison_pill, &my_processor);
- mbp.enqueue (my_message {1});
- mbp.enqueue (my_message {2});
- mbp.enqueue (my_message {3});
- std::cout << "Press new-line to exit" << std::endl;
- std::string line;
- std::getline(std::cin, line);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement