Guest User

main.cpp

a guest
Feb 27th, 2015
419
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.47 KB | None | 0 0
  1. /*
  2.  * main.cpp
  3.  */
  4.  
  5. #include <zmq.hpp>
  6.  
  7. #include <iostream>
  8. #include <sstream>
  9. #include <boost/thread.hpp>
  10.  
  11. #include "boxoffice.hpp"
  12. #include "publisher.hpp"
  13.  
  14. void *publisher(void *arg)
  15. {
  16.   Publisher* pub;
  17.   pub = Publisher::initialize(static_cast<zmq::context_t*>(arg));
  18.  
  19.   std::cout << "pub: starting pubsub sockets and sending..." << std::endl;
  20.  
  21.   zmq::context_t *z_context = static_cast<zmq::context_t*>(arg);
  22.   zmq::socket_t z_publisher(*z_context, ZMQ_PUB);
  23.   z_publisher.bind("ipc://syncbox.ipc");
  24.  
  25.   for (int i = 0; i < 5; ++i)
  26.   {
  27.     std::string message = "this is message: " + std::to_string(i);
  28.     std::cout << "pub: " << message << std::endl;
  29.     zmq::message_t z_msg(message.length()+1);
  30.     snprintf((char*) z_msg.data(), message.length()+1, "%s", message.c_str());
  31.     z_publisher.send(z_msg);
  32.     sleep(1);
  33.   }
  34.  
  35.   pub->sendExitSignal();
  36.  
  37.   z_publisher.close();
  38.  
  39.   return (NULL);
  40. }
  41.  
  42. void *subscriber(void *arg)
  43. {
  44.   std::cout << "sub: receiving from publisher..." << std::endl;
  45.  
  46.   zmq::context_t *z_context = static_cast<zmq::context_t*>(arg);
  47.  
  48.   zmq::socket_t z_boxoffice(*z_context, ZMQ_REQ);
  49.   zmq::socket_t z_subscriber(*z_context, ZMQ_SUB);
  50.  
  51.   z_boxoffice.connect("inproc://sb_boxoffice_req");
  52.   z_subscriber.connect("ipc://syncbox.ipc");
  53.   const char *sub_filter = "";
  54.   z_subscriber.setsockopt(ZMQ_SUBSCRIBE, sub_filter, 0);
  55.  
  56.   for (int i = 0; i < 3; ++i)
  57.   {
  58.     zmq::message_t z_message;
  59.     z_subscriber.recv(&z_message);
  60.  
  61.     std::istringstream iss(static_cast<char*>(z_message.data()));
  62.     std::cout << "sub:" << iss.str() << std::endl;
  63.   }
  64.  
  65.   std::cout << "sub: all messages received, sending signal..." << std::endl;
  66.   zmq::message_t z_msg_close(3);
  67.   snprintf((char*) z_msg_close.data(), 4, "%d %d", 0, 0);
  68.   z_boxoffice.send(z_msg_close);
  69.   std::cout << "sub: signal sent, exiting..." << std::endl;
  70.  
  71.   return (NULL);
  72. }
  73.  
  74. void *boxoffice_thread(void *arg)
  75. {
  76.   Boxoffice* bo;
  77.   bo = Boxoffice::initialize(static_cast<zmq::context_t*>(arg));
  78.   return (NULL);
  79. }
  80.  
  81. int main(int argc, char* argv[])
  82. {
  83.     zmq::context_t z_context(1);
  84.  
  85.     // we eagerly initialize the Boxoffice- and Publisher-singleton here,
  86.     // for thread-safety
  87.     Boxoffice* bo;
  88.     bo = Boxoffice::getInstance();
  89.     Publisher* pub;
  90.     pub = Publisher::getInstance();
  91.  
  92.     // bind to boxoffice endpoint
  93.     zmq::socket_t z_boxoffice(z_context, ZMQ_PAIR);
  94.     z_boxoffice.bind("inproc://sb_bo_main_pair");
  95.  
  96.     std::cout << "main: opening boxoffice thread" << std::endl;
  97.     boost::thread bo_thread(boxoffice_thread, &z_context);
  98.  
  99.     // open publisher thread
  100.     std::cout << "main: opening publisher thread" << std::endl;
  101.     boost::thread pub_thread(publisher, &z_context);
  102.  
  103.     // open subscriber thread
  104.     std::cout << "main: opening subscriber thread" << std::endl;
  105.     boost::thread sub_thread(subscriber, &z_context);
  106.  
  107.     // wait for signal from boxoffice
  108.     zmq::message_t z_msg_close;
  109.     int msg_type, msg_signal;
  110.     std::cout << "main: waiting for boxoffice to send exit signal" << std::endl;
  111.     z_boxoffice.recv(&z_msg_close);
  112.     std::istringstream iss_pub(static_cast<char*>(z_msg_close.data()));
  113.     iss_pub >> msg_type >> msg_signal;
  114.     iss_pub.clear();
  115.  
  116.     std::cout << "main: received message: " << msg_type << " " << msg_signal << std::endl;
  117.  
  118.     z_boxoffice.close();
  119.     z_context.close();
  120.  
  121.     bo_thread.detach();
  122.     pub_thread.detach();
  123.     sub_thread.detach();
  124.  
  125. //    exit(0);
  126.     return 0;
  127. }
Advertisement
Add Comment
Please, Sign In to add comment