Guest User

min-sub.c

a guest
Nov 3rd, 2020
172
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 2.26 KB | None | 0 0
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <signal.h>
  6. #include <zmq.h>
  7.  
  8. #define ENDPOINT "ipc://@test"
  9.  
  10. static char * s_recv(void *socket)
  11. {
  12.     char buffer[256];
  13.     int size = zmq_recv(socket, buffer, 255, 0);
  14.     if (size == -1)
  15.         return NULL;
  16.     if (size > 255)
  17.         size = 255;
  18.     buffer[size] = '\0';
  19.     return strdup(buffer);
  20. }
  21.  
  22. static sig_atomic_t stopped = 0;
  23. void signint_handler(int sig)
  24. {
  25.     stopped = 1;
  26. }
  27.  
  28. int main(int argc, char *argv[])
  29. {
  30.     signal(SIGINT, signint_handler);
  31.  
  32.     if (argc != 3) {
  33.         printf("Invalid arguments\n");
  34.         return 1;
  35.     }
  36.  
  37.     int num_subs = atoi(argv[1]);
  38.     if (num_subs <= 0) {
  39.         printf("Invalid number of subs\n");
  40.         return 1;
  41.     }
  42.  
  43.     const char *sub_str = argv[2];
  44.  
  45.     void *ctx = zmq_ctx_new();
  46.     if (!ctx) {
  47.         perror("Error creating context");
  48.         return 1;
  49.     }
  50.  
  51.     void **socks = malloc(num_subs * sizeof(void *));
  52.     if (!socks) {
  53.         printf("Out of memory\n");
  54.         return 1;
  55.     }
  56.  
  57.     for (int i = 0; i < num_subs; i++) {
  58.         void *sock = zmq_socket(ctx, ZMQ_SUB);
  59.         if (!sock) {
  60.             perror("Error creating socket");
  61.             return 1;
  62.         }
  63.  
  64.         if (zmq_connect(sock, ENDPOINT) < 0) {
  65.             perror("Connect error");
  66.             return 1;
  67.         }
  68.  
  69.         if (zmq_setsockopt(sock, ZMQ_SUBSCRIBE, sub_str, 1) < 0) {
  70.             perror("Subscribe error");
  71.             return 1;
  72.         }
  73.  
  74.         socks[i] = sock;
  75.     }
  76.  
  77.     while (1) {
  78.         for (int i = 0; i < num_subs; i++) {
  79.             char *string = s_recv(socks[i]);
  80.             if (!string) {
  81.                 if (EINTR == errno)
  82.                     goto finish;
  83.                 perror("Receive error");
  84.                 return 1;
  85.             }
  86.  
  87.             printf("recv sock[%d]: %s\n", i, string);
  88.             free(string);
  89.  
  90.             if (stopped)
  91.                 goto finish;
  92.         }
  93.     }
  94.  
  95. finish:
  96.     printf("exiting...\n");
  97.     for (int i = 0; i < num_subs; i++) {
  98.         void *sock = socks[i];
  99.         zmq_setsockopt(sock, ZMQ_UNSUBSCRIBE, sub_str, 1);
  100.         zmq_close(sock);
  101.     }
  102.     free(socks);
  103.     zmq_ctx_destroy(ctx);
  104.     return 0;
  105. }
  106.  
Advertisement
Add Comment
Please, Sign In to add comment