SHARE
TWEET

High water mark for PUB/SUB.

a guest Jul 13th, 2016 104 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /**
  2.  * Publisher.
  3.  */
  4. #include <zmq.h>
  5. #include <chrono>
  6. #include <thread>
  7. #include <cstdio>
  8. #include <cstring>
  9. #include <cassert>
  10.  
  11. int main (void)
  12. {
  13.     printf("Starting PUB...\n");
  14.     void* context = zmq_ctx_new();
  15.     assert(context && "Failure in context.");
  16.     void* socket = zmq_socket(context, ZMQ_PUB);
  17.     assert(socket && "Failure in socket.");
  18.  
  19.     {
  20.         auto const rc = zmq_bind(socket, "tcp://*:5555");
  21.         assert(!rc && "Failure in bind.");
  22.     }
  23.  
  24.     {
  25.         int const hwm = 2;
  26.         int const rc = zmq_setsockopt(socket, ZMQ_SNDHWM, &hwm, sizeof(int));
  27.         assert(!rc && "Failure in setsocket.");
  28.     }
  29.  
  30.     for (int i = 0; ; ++i) {
  31.         char message[20] = "";
  32.         sprintf(message, "pub_%d", i);
  33.         auto const rc = zmq_send(socket, message, strlen(message), 0);
  34.         assert(rc > 0 && "Failure in send.");
  35.         printf("PUB sent: %s\n", message);
  36.         int const delay = 200;
  37.         std::this_thread::sleep_for(std::chrono::milliseconds(delay));
  38.     }
  39.     zmq_close(socket);
  40.     zmq_ctx_destroy(context);
  41.     return EXIT_SUCCESS;
  42. }
  43.  
  44. /**
  45.  * Subscriber (client).
  46.  */
  47. #include <zmq.h>
  48. #include <chrono>
  49. #include <thread>
  50. #include <cassert>
  51. #include <cstdlib>
  52.  
  53. int main(void)
  54. {
  55.     printf("Starting SUB...\n");
  56.     void* context = zmq_ctx_new();
  57.     assert(context && "Failure in context.");
  58.     void* socket = zmq_socket(context, ZMQ_SUB);
  59.     assert(socket && "Failure in socket.");
  60.  
  61.     {
  62.         int const rc = zmq_connect(socket, "tcp://localhost:5555");
  63.         assert(!rc && "Failure in connect.");
  64.     }
  65.  
  66.     {
  67.         int const rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
  68.         assert(!rc && "Failure in setsocket.");
  69.     }
  70.  
  71.     {
  72.         int const hwm = 2;
  73.         int const rc = zmq_setsockopt(socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
  74.         assert(!rc && "Failure in setsocket.");
  75.     }
  76.  
  77.     while (true) {
  78.         char buffer[256] = "";
  79.         errno = 0;
  80.         int const rc = zmq_recv(socket, buffer, sizeof(buffer), 0);
  81.         assert(rc > 0 && "Failure in recv.");
  82.         printf("SUB received: %s\n", buffer);
  83.         int const delay = 1;
  84.         std::this_thread::sleep_for(std::chrono::seconds(delay));
  85.     }
  86.     zmq_close(socket);
  87.     zmq_ctx_destroy(context);
  88.     return EXIT_SUCCESS;
  89. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top