Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <signal.h>
- #include <zmq.h>
- #define ENDPOINT "ipc://@test"
- static char * s_recv(void *socket)
- {
- char buffer[256];
- int size = zmq_recv(socket, buffer, 255, 0);
- if (size == -1)
- return NULL;
- if (size > 255)
- size = 255;
- buffer[size] = '\0';
- return strdup(buffer);
- }
- static sig_atomic_t stopped = 0;
- void signint_handler(int sig)
- {
- stopped = 1;
- }
- int main(int argc, char *argv[])
- {
- signal(SIGINT, signint_handler);
- if (argc != 3) {
- printf("Invalid arguments\n");
- return 1;
- }
- int num_subs = atoi(argv[1]);
- if (num_subs <= 0) {
- printf("Invalid number of subs\n");
- return 1;
- }
- const char *sub_str = argv[2];
- void *ctx = zmq_ctx_new();
- if (!ctx) {
- perror("Error creating context");
- return 1;
- }
- void **socks = malloc(num_subs * sizeof(void *));
- if (!socks) {
- printf("Out of memory\n");
- return 1;
- }
- for (int i = 0; i < num_subs; i++) {
- void *sock = zmq_socket(ctx, ZMQ_SUB);
- if (!sock) {
- perror("Error creating socket");
- return 1;
- }
- if (zmq_connect(sock, ENDPOINT) < 0) {
- perror("Connect error");
- return 1;
- }
- if (zmq_setsockopt(sock, ZMQ_SUBSCRIBE, sub_str, 1) < 0) {
- perror("Subscribe error");
- return 1;
- }
- socks[i] = sock;
- }
- while (1) {
- for (int i = 0; i < num_subs; i++) {
- char *string = s_recv(socks[i]);
- if (!string) {
- if (EINTR == errno)
- goto finish;
- perror("Receive error");
- return 1;
- }
- printf("recv sock[%d]: %s\n", i, string);
- free(string);
- if (stopped)
- goto finish;
- }
- }
- finish:
- printf("exiting...\n");
- for (int i = 0; i < num_subs; i++) {
- void *sock = socks[i];
- zmq_setsockopt(sock, ZMQ_UNSUBSCRIBE, sub_str, 1);
- zmq_close(sock);
- }
- free(socks);
- zmq_ctx_destroy(ctx);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment