Advertisement
Guest User

High water mark for PUB/SUB.

a guest
Jul 13th, 2016
141
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.30 KB | None | 0 0
  1. /**
  2. * Publisher.
  3. */
  4. #include <zmq.h>
  5. #include <chrono>
  6. #include <thread>
  7. #include <cstdio>
  8. #include <cstring>
  9. #include <cassert>
  10.  
  11. int main (void)
  12. {
  13. printf("Starting PUB...\n");
  14. void* context = zmq_ctx_new();
  15. assert(context && "Failure in context.");
  16. void* socket = zmq_socket(context, ZMQ_PUB);
  17. assert(socket && "Failure in socket.");
  18.  
  19. {
  20. auto const rc = zmq_bind(socket, "tcp://*:5555");
  21. assert(!rc && "Failure in bind.");
  22. }
  23.  
  24. {
  25. int const hwm = 2;
  26. int const rc = zmq_setsockopt(socket, ZMQ_SNDHWM, &hwm, sizeof(int));
  27. assert(!rc && "Failure in setsocket.");
  28. }
  29.  
  30. for (int i = 0; ; ++i) {
  31. char message[20] = "";
  32. sprintf(message, "pub_%d", i);
  33. auto const rc = zmq_send(socket, message, strlen(message), 0);
  34. assert(rc > 0 && "Failure in send.");
  35. printf("PUB sent: %s\n", message);
  36. int const delay = 200;
  37. std::this_thread::sleep_for(std::chrono::milliseconds(delay));
  38. }
  39. zmq_close(socket);
  40. zmq_ctx_destroy(context);
  41. return EXIT_SUCCESS;
  42. }
  43.  
  44. /**
  45. * Subscriber (client).
  46. */
  47. #include <zmq.h>
  48. #include <chrono>
  49. #include <thread>
  50. #include <cassert>
  51. #include <cstdlib>
  52.  
  53. int main(void)
  54. {
  55. printf("Starting SUB...\n");
  56. void* context = zmq_ctx_new();
  57. assert(context && "Failure in context.");
  58. void* socket = zmq_socket(context, ZMQ_SUB);
  59. assert(socket && "Failure in socket.");
  60.  
  61. {
  62. int const rc = zmq_connect(socket, "tcp://localhost:5555");
  63. assert(!rc && "Failure in connect.");
  64. }
  65.  
  66. {
  67. int const rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
  68. assert(!rc && "Failure in setsocket.");
  69. }
  70.  
  71. {
  72. int const hwm = 2;
  73. int const rc = zmq_setsockopt(socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
  74. assert(!rc && "Failure in setsocket.");
  75. }
  76.  
  77. while (true) {
  78. char buffer[256] = "";
  79. errno = 0;
  80. int const rc = zmq_recv(socket, buffer, sizeof(buffer), 0);
  81. assert(rc > 0 && "Failure in recv.");
  82. printf("SUB received: %s\n", buffer);
  83. int const delay = 1;
  84. std::this_thread::sleep_for(std::chrono::seconds(delay));
  85. }
  86. zmq_close(socket);
  87. zmq_ctx_destroy(context);
  88. return EXIT_SUCCESS;
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement