Guest User

boxoffice.cpp

a guest
Feb 27th, 2015
128
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.05 KB | None | 0 0
  1. /*
  2.  * Boxoffice
  3.  */
  4.  
  5. #include "boxoffice.hpp"
  6.  
  7. #include <zmq.hpp>
  8. #include <sstream>
  9. #include <iostream>
  10.  
  11. Boxoffice* Boxoffice::initialize(zmq::context_t* z_ctx)
  12. {
  13.   Boxoffice* bo = getInstance();
  14.  
  15.   bo->connectToPubsAndSubs(z_ctx);
  16.  
  17.   return bo;
  18. }
  19.  
  20. int Boxoffice::connectToPubsAndSubs(zmq::context_t* z_ctx)
  21. {
  22.   // standard variables
  23.   zmq::message_t z_msg;
  24.   int msg_type, msg_signal;
  25.   std::stringstream sstream;
  26.  
  27.   // open PAIR sender to main thread
  28.   zmq::socket_t z_bo_main(*z_ctx, ZMQ_PAIR);
  29.   z_bo_main.connect("inproc://sb_bo_main_pair");
  30.  
  31.   // since the publisher is a singleton, we can simply use two ZMQ_PAIRs
  32.   zmq::socket_t z_bo_to_pub(*z_ctx, ZMQ_PAIR);
  33.   zmq::socket_t z_pub_to_bo(*z_ctx, ZMQ_PAIR);
  34.   z_bo_to_pub.connect("inproc://sb_bo_to_pub_pair");
  35.   z_pub_to_bo.bind("inproc://sb_pub_to_bo_pair");
  36.  
  37.   // wait for heartbeat from publisher
  38.   std::cout << "bo: waiting for publisher to send heartbeat" << std::endl;
  39.   z_pub_to_bo.recv(&z_msg);
  40.   sstream.clear();
  41.   sstream << static_cast<char*>(z_msg.data());
  42.   sstream >> msg_type >> msg_signal;
  43.  
  44.   std::cout << "bo: received message: " << msg_type << " " << msg_signal << std::endl;
  45.  
  46.   // connect to subscribers
  47.   zmq::socket_t z_subscribers(*z_ctx, ZMQ_ROUTER);
  48.  
  49.   z_subscribers.bind("inproc://sb_boxoffice_req");
  50.  
  51.   std::cout << "bo: starting to listen to subscriber..." << std::endl;
  52.  
  53.   zmq::pollitem_t z_items[] = {
  54.     { z_subscribers, 0, ZMQ_POLLIN, 0 }
  55.   };
  56.  
  57.   bool exit_signal = false;
  58.   while(true) {
  59.     int more;
  60.  
  61.     zmq::poll(&z_items[0], 1, -1);
  62.  
  63.     if (z_items[0].revents & ZMQ_POLLIN) {
  64.       while(true) {
  65.         int msg_type, msg_signal;
  66.         zmq::message_t z_msg;
  67.  
  68.         std::cout << "bo: waiting for subscriber to send exit signal" << std::endl;
  69.         z_subscribers.recv(&z_msg);
  70.         size_t more_size = sizeof(more);
  71.         z_subscribers.getsockopt(ZMQ_RCVMORE, &more, &more_size);
  72.  
  73.         std::istringstream iss_sub(static_cast<char*>(z_msg.data()));
  74.         iss_sub >> msg_type >> msg_signal;
  75.         std::cout << "bo: sub sent: " << msg_type << " " << msg_signal << std::endl;
  76.         if ( msg_type == 0 && msg_signal == 0 )
  77.         {
  78.           snprintf((char*) z_msg.data(), 4, "0 0");
  79.           z_subscribers.send(z_msg, more? ZMQ_SNDMORE: 0);
  80.           exit_signal = true;
  81.           break;
  82.         } else {
  83.           snprintf((char*) z_msg.data(), 4, "0 1");
  84.           z_subscribers.send(z_msg);
  85.         }
  86.       }
  87.     }
  88.     if (exit_signal) break;
  89.   }
  90.  
  91.   // wait for exit signal from publisher
  92.   std::cout << "bo: waiting for publisher to send exit signal" << std::endl;
  93.   z_pub_to_bo.recv(&z_msg);
  94.   sstream.clear();
  95.   sstream << static_cast<char*>(z_msg.data());
  96.   sstream >> msg_type >> msg_signal;
  97.  
  98.   std::cout << "bo: exit signal received, sending signal..." << std::endl;
  99.   zmq::message_t z_msg_close(3);
  100.   snprintf((char*) z_msg_close.data(), 4, "%d %d", 0, 0);
  101.   z_bo_main.send(z_msg_close);
  102.  
  103.   z_bo_main.close();
  104.  
  105.   std::cout << "bo: exit signal sent, exiting..." << std::endl;
  106.  
  107.   return 0;
  108. }
Advertisement
Add Comment
Please, Sign In to add comment