Advertisement
Guest User

Untitled

a guest
Jan 31st, 2012
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.33 KB | None | 0 0
  1. #include <zmq.hpp>
  2. #include <ctime>
  3. #include <iostream>
  4.  
  5. // in this version socket isn't eventually destroed and socketpair fd's never released. we run out of fd's or reach zmq max_sockets
  6. static void *device (void * data) {
  7.     zmq::context_t& context = *(static_cast<zmq::context_t*>(data));
  8.  
  9.     zmq::socket_t frontend (context, ZMQ_XREP);
  10.     zmq::socket_t backend (context, ZMQ_XREQ);
  11.  
  12.     int zero = 0;
  13.     frontend.setsockopt (ZMQ_LINGER, &zero, sizeof (zero));
  14.     frontend.bind ("inproc://frontend");
  15.  
  16.     uint64_t queue_size = 1;
  17.     backend.setsockopt (ZMQ_HWM, &queue_size, sizeof (queue_size));
  18.     backend.setsockopt (ZMQ_LINGER, &zero, sizeof (zero));
  19.  
  20.     // we don't connect backend socket. we simulate backend service being unavailable.
  21.  
  22.     zmq::device (ZMQ_QUEUE, frontend, backend);
  23. }
  24.  
  25. static void *client (void * data) {
  26.     sleep(1);
  27.     zmq::context_t& context = *static_cast<zmq::context_t*> (data);
  28.     for (int i = 0; i < 2000; i++) {
  29.         std::cout << i << std::endl;
  30.         zmq::socket_t socket (context, ZMQ_REQ);
  31.         int zero = 0;
  32.         socket.setsockopt (ZMQ_LINGER, &zero, sizeof (zero));
  33.         socket.connect("inproc://frontend");
  34.  
  35.         zmq::message_t message (1);
  36.         socket.send (message);
  37.        
  38.         zmq_pollitem_t item;
  39.         item.socket = (void*)socket;
  40.         item.events = ZMQ_POLLIN | ZMQ_POLLERR;
  41.         // wait for 10ms before tearing down the socket
  42.         if (zmq::poll (&item, 1, 10000) && (item.revents & ZMQ_POLLIN)) {
  43.             // we never should get here because noone is listening
  44.             std::cout << "abort!" << std::endl;
  45.             abort();
  46.         }
  47.     }
  48. }
  49.  
  50. int main (void) {
  51.     pthread_t client_thid;
  52.     zmq::context_t context (1);
  53.  
  54.     pthread_create (&client_thid, NULL, client, &context);
  55.  
  56.     zmq::socket_t frontend (context, ZMQ_XREP);
  57.     zmq::socket_t backend (context, ZMQ_XREQ);
  58.  
  59.     int zero = 0;
  60.     frontend.setsockopt (ZMQ_LINGER, &zero, sizeof (zero));
  61.     frontend.bind ("inproc://frontend");
  62.  
  63.     uint64_t queue_size = 0;
  64.     backend.setsockopt (ZMQ_HWM, &queue_size, sizeof (queue_size));
  65.     backend.setsockopt (ZMQ_LINGER, &zero, sizeof (zero));
  66.  
  67.     // we don't connect backend socket. we simulate backend service being unavailable.
  68.  
  69.     zmq::device (ZMQ_QUEUE, frontend, backend);
  70.  
  71.     return 0;
  72. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement