jgeorger

czmq reactor

Mar 8th, 2018
179
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.67 KB | None | 0 0
  1. #include "czmq.h"
  2. #include <iostream>
  3. #include <string.h>
  4.  
  5. int check_interrupt(zsock_t *pipe) {
  6.     int interrupt = 0;
  7.     char *msg = zstr_recv(pipe);
  8.  
  9.     if (zsys_interrupted || (msg && !strcmp(msg, "$TERM")))
  10.         interrupt = -1;
  11.  
  12.     free(msg);
  13.     return interrupt;    
  14. }
  15.  
  16. static int
  17. timer_handler(zloop_t *loop, int timer_id, void *pipe)
  18. {
  19.     std::cout << "timer_handler: time to do something.\n";
  20.     zstr_send(pipe, "Timer says go!");
  21.     return 0;
  22. }
  23.  
  24. static void
  25. timer_thread(zsock_t *pipe, void *args) {
  26.     zsock_signal(pipe, 0);
  27.     std::cout << "timer_thread: starting up\n";
  28.     while(1) {
  29.         char *str = zstr_recv(pipe);
  30.         std::cout << "timer_thread received: " << str << std::endl;
  31.         free(str);
  32.         if (check_interrupt(pipe))
  33.             break;
  34.         // request list from socket_thread
  35.         zstr_send(args, "list request");
  36.         // recv list and do stuff
  37.     }  
  38. }
  39.  
  40. static int
  41. socket_handler(zloop_t *loop, zsock_t *handle, void *arg) {
  42.     int interrupt = 0;
  43.     zmsg_t *msg = zmsg_new();
  44.     assert(msg);
  45.     msg = zmsg_recv(handle);
  46.     std::cout << "socket_handler: recvd socket msg\n";
  47.     if (zsys_interrupted || (msg && !strcmp(zmsg_popstr(msg), "$TERM")))
  48.         interrupt = -1;
  49.     zstr_send(arg, "socket-related msg");
  50.     return interrupt;
  51. }
  52.  
  53. static void
  54. socket_thread(zsock_t *pipe, void *args) {
  55.     zsock_signal(pipe, 0);
  56.     std::cout << "socket_thread: starting up\n";
  57.     while(1) {
  58.         char *str = zstr_recv(pipe);
  59.         std::cout << "socket_thread: received: " << str << std::endl;
  60.         free(str);
  61.         if (check_interrupt(pipe))
  62.             break;
  63.         // Parse the msg, either add msg to list or return list
  64.     }
  65. }
  66.  
  67.  
  68.  
  69. int main(int argc, char *argv[]) {
  70.     zactor_t *socket_zactor = zactor_new(socket_thread, NULL);
  71.     zactor_t *timer_zactor = zactor_new(timer_thread, socket_zactor);
  72.  
  73.     // Set up our socket
  74.     char *endpoint;
  75.     asprintf(&endpoint, "tcp://%s:%d", "localhost", 5555);
  76.     zsock_t *a_zsock = zsock_new_sub(endpoint, "");
  77.     assert(a_zsock);
  78.     free(endpoint);
  79.  
  80.     zloop_t *loop = zloop_new();
  81.     assert(loop);
  82.     zloop_set_verbose(loop, true);
  83.  
  84.     int timer_id = zloop_timer(loop, 10000, 0, timer_handler, timer_zactor);
  85.     assert(timer_id != -1);
  86.  
  87.     int rc = zloop_reader(loop, a_zsock, socket_handler, socket_zactor);
  88.     assert(rc == 0);
  89.  
  90.     zloop_reader_set_tolerant(loop, a_zsock);
  91.     while (!zsys_interrupted) {
  92.         if (zloop_start(loop) != 0)
  93.             break;
  94.     }
  95.  
  96.     zactor_destroy(&socket_zactor);
  97.     zactor_destroy(&timer_zactor);
  98.     zsock_destroy(&a_zsock);
  99.     zloop_destroy(&loop);
  100.     assert (loop == NULL);
  101. }
Advertisement
Add Comment
Please, Sign In to add comment