Advertisement
Guest User

zeromq_rcvbuf.cc

a guest
Sep 25th, 2012
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.09 KB | None | 0 0
  1. //
  2. // 25-aug-2012 -- Rob Iverson
  3. // Attempt to reproduce my "ZMQ_RCVBUF" issue for the purposes
  4. // of reporting it to zeromq mailing list or bug tracker.
  5. // To run this:
  6. // zeromq_rcvbuf -send
  7. // zeromq_rcvbuf -recv  (run this before the sender is done)
  8. //
  9. #include <zmq.hpp>
  10. #include <string>
  11. #include <iostream>
  12.  
  13. // 350,000 shows the problem for me; 250,000 does not
  14. #define MESSAGE_SIZE 350000
  15.  
  16. // a sleep is required for me to see the problem
  17. #define SEND_SLEEP_TIME 1
  18.  
  19. static int zmq_sender();
  20. static int zmq_receiver();
  21.  
  22. int main(int argc, char **argv)
  23. {
  24.   if ( argc < 2 ) {
  25.     std::cerr << "Usage: " << argv[0] << " [ -send | -recv ]" << std::endl;
  26.     exit(1);
  27.   }
  28.  
  29.   if ( !strcmp(argv[1],"-send") ) {
  30.     return zmq_sender();
  31.   }
  32.   else if ( !strcmp(argv[1],"-recv") ) {
  33.     return zmq_receiver();
  34.   }
  35.   exit(1);
  36. }
  37.  
  38. static int zmq_sender()
  39. {
  40.   zmq::context_t context(1);
  41.   zmq::socket_t socket(context, ZMQ_PUB);
  42.  
  43.   socket.connect("tcp://localhost:5555");
  44.  
  45.   for (int i = 0 ; i < 10 ; i++)
  46.   {
  47.     zmq::message_t message(MESSAGE_SIZE);
  48.  
  49.     *(static_cast<int*>(message.data())) = i;
  50.  
  51.     sleep(SEND_SLEEP_TIME);
  52.  
  53.     std::cout << "Sending message #" << i << std::endl;
  54.     socket.send(message);
  55.   }
  56.  
  57.   // Sleeping for 5s here makes the problem go away
  58.   //sleep(5);
  59.  
  60.   socket.close();
  61.    
  62.   return 0;
  63. }
  64.  
  65. static int zmq_receiver()
  66. {
  67.   zmq::context_t context(1);
  68.   zmq::socket_t socket(context, ZMQ_SUB);
  69.  
  70.   socket.bind("tcp://*:5555");
  71.  
  72.   // subscribe for all messages
  73.   socket.setsockopt(ZMQ_SUBSCRIBE,NULL,0);
  74.  
  75.   // Adjusting the RCVBUF size makes more messages go through
  76.   //   ::uint64_t new_rcvbuf = 200000;
  77.   //   socket.setsockopt(ZMQ_RCVBUF, &new_rcvbuf, sizeof(new_rcvbuf));
  78.  
  79.   for (int i = 0 ; i < 10 ; i++)
  80.   {
  81.     zmq::message_t request;
  82.  
  83.     std::cout << "+ Blocking waiting for message" << std::endl;
  84.     socket.recv(&request);
  85.  
  86.     int sent_index = *(static_cast<int*>(request.data()));
  87.  
  88.     std::cout << "Received Message #" << sent_index\
  89.               << " of size " << request.size() << std::endl;
  90.   }
  91.  
  92.   return 0;
  93. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement