Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * main.cpp
- */
- #include <zmq.hpp>
- #include <iostream>
- #include <sstream>
- #include <boost/thread.hpp>
- #include "boxoffice.hpp"
- #include "publisher.hpp"
- void *publisher(void *arg)
- {
- Publisher* pub;
- pub = Publisher::initialize(static_cast<zmq::context_t*>(arg));
- std::cout << "pub: starting pubsub sockets and sending..." << std::endl;
- zmq::context_t *z_context = static_cast<zmq::context_t*>(arg);
- zmq::socket_t z_publisher(*z_context, ZMQ_PUB);
- z_publisher.bind("ipc://syncbox.ipc");
- for (int i = 0; i < 5; ++i)
- {
- std::string message = "this is message: " + std::to_string(i);
- std::cout << "pub: " << message << std::endl;
- zmq::message_t z_msg(message.length()+1);
- snprintf((char*) z_msg.data(), message.length()+1, "%s", message.c_str());
- z_publisher.send(z_msg);
- sleep(1);
- }
- pub->sendExitSignal();
- z_publisher.close();
- return (NULL);
- }
- void *subscriber(void *arg)
- {
- std::cout << "sub: receiving from publisher..." << std::endl;
- zmq::context_t *z_context = static_cast<zmq::context_t*>(arg);
- zmq::socket_t z_boxoffice(*z_context, ZMQ_REQ);
- zmq::socket_t z_subscriber(*z_context, ZMQ_SUB);
- z_boxoffice.connect("inproc://sb_boxoffice_req");
- z_subscriber.connect("ipc://syncbox.ipc");
- const char *sub_filter = "";
- z_subscriber.setsockopt(ZMQ_SUBSCRIBE, sub_filter, 0);
- for (int i = 0; i < 3; ++i)
- {
- zmq::message_t z_message;
- z_subscriber.recv(&z_message);
- std::istringstream iss(static_cast<char*>(z_message.data()));
- std::cout << "sub:" << iss.str() << std::endl;
- }
- std::cout << "sub: all messages received, sending signal..." << std::endl;
- zmq::message_t z_msg_close(3);
- snprintf((char*) z_msg_close.data(), 4, "%d %d", 0, 0);
- z_boxoffice.send(z_msg_close);
- std::cout << "sub: signal sent, exiting..." << std::endl;
- return (NULL);
- }
- void *boxoffice_thread(void *arg)
- {
- Boxoffice* bo;
- bo = Boxoffice::initialize(static_cast<zmq::context_t*>(arg));
- return (NULL);
- }
- int main(int argc, char* argv[])
- {
- zmq::context_t z_context(1);
- // we eagerly initialize the Boxoffice- and Publisher-singleton here,
- // for thread-safety
- Boxoffice* bo;
- bo = Boxoffice::getInstance();
- Publisher* pub;
- pub = Publisher::getInstance();
- // bind to boxoffice endpoint
- zmq::socket_t z_boxoffice(z_context, ZMQ_PAIR);
- z_boxoffice.bind("inproc://sb_bo_main_pair");
- std::cout << "main: opening boxoffice thread" << std::endl;
- boost::thread bo_thread(boxoffice_thread, &z_context);
- // open publisher thread
- std::cout << "main: opening publisher thread" << std::endl;
- boost::thread pub_thread(publisher, &z_context);
- // open subscriber thread
- std::cout << "main: opening subscriber thread" << std::endl;
- boost::thread sub_thread(subscriber, &z_context);
- // wait for signal from boxoffice
- zmq::message_t z_msg_close;
- int msg_type, msg_signal;
- std::cout << "main: waiting for boxoffice to send exit signal" << std::endl;
- z_boxoffice.recv(&z_msg_close);
- std::istringstream iss_pub(static_cast<char*>(z_msg_close.data()));
- iss_pub >> msg_type >> msg_signal;
- iss_pub.clear();
- std::cout << "main: received message: " << msg_type << " " << msg_signal << std::endl;
- z_boxoffice.close();
- z_context.close();
- bo_thread.detach();
- pub_thread.detach();
- sub_thread.detach();
- // exit(0);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment