Advertisement
Guest User

Failing ZMQ open/close pattern

a guest
Nov 25th, 2014
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.44 KB | None | 0 0
  1. /*
  2.  * Author:          Mark Wright (markscottwright@gmail.com)
  3.  * Creation Date:   2014-11-25
  4.  *
  5.  *
  6.  * Built with:
  7.  *
  8.     @echo off
  9.     setlocal
  10.     @call "c:\Program Files (x86)\Microsoft Visual Studio 10.0\VC\bin\vcvars32.bat"
  11.  
  12.     if not exist obj mkdir obj
  13.     if not exist bin mkdir bin
  14.  
  15.     cl zmqclose.cpp /I "C:\tools\ZeroMQ 4.0.4\include" /nologo /EHsc /Foobj\ /Febin\ ^
  16.         /nologo /EHsc "C:\tools\ZeroMQ 4.0.4\lib\libzmq-v110-mt-4_0_4.lib"
  17.  
  18.  
  19.  */
  20. #include <iostream>
  21. #include <string>
  22. #include <sstream>
  23. #include <iomanip>
  24. #include <zmq.h>
  25. #include <zmq_utils.h>
  26.  
  27. using namespace std;
  28.  
  29. #define ROUTER_ENDPOINT "inproc://router"
  30. #define CHECK(x) \
  31.     if ((x) != 0) dump_error(__LINE__, #x);
  32. #define CHECKNOTNULL(x) \
  33.     if ((x) == NULL) dump_error(__LINE__, #x);
  34. #define CHECKPOS(x) \
  35.     if ((x) < 0) dump_error(__LINE__, #x);
  36.  
  37. void* context;
  38. int num_operations;
  39.  
  40. void dump_error(int line_number, const char* cmd)
  41. {
  42.     cout << cmd << " failed at line " << line_number << endl;
  43.     cout << "error = " << strerror(errno) << endl;
  44.     cout << "error = " << errno << endl;
  45.     exit(0);
  46. }
  47.  
  48. string hexstring(const char* bytes, int len)
  49. {
  50.     ostringstream hexstr;
  51.     for (int i=0; i < len; ++i)
  52.         hexstr << hex << setfill('0') << setw(2) << (int) bytes[i];
  53.     return hexstr.str();
  54. }
  55.  
  56. void router_thread(void* router_socket)
  57. {
  58.     char client_id[100], null[100], message[100];
  59.     int client_id_len, null_len, message_len;
  60.     int more;
  61.     size_t more_len = sizeof(more);
  62.     int ops_processed = 0;
  63.     do {
  64.         CHECKPOS(client_id_len = zmq_recv(router_socket, client_id, 100, 0));
  65.         CHECK(zmq_getsockopt(router_socket, ZMQ_RCVMORE, &more, &more_len));
  66.         CHECKPOS(null_len = zmq_recv(router_socket, null, 100, 0));
  67.         CHECK(zmq_getsockopt(router_socket, ZMQ_RCVMORE, &more, &more_len));
  68.         CHECKPOS(message_len = zmq_recv(router_socket, message, 100, 0));
  69.         CHECKPOS(zmq_send(router_socket, client_id, client_id_len, ZMQ_SNDMORE));
  70.         CHECKPOS(zmq_send(router_socket, "", 0, ZMQ_SNDMORE));
  71.         ostringstream rsp_builder;
  72.         rsp_builder << "ok " << ops_processed;
  73.         string rsp = rsp_builder.str();
  74.         CHECKPOS(zmq_send(router_socket, rsp.c_str(), rsp.size(), 0));
  75.         ops_processed++;
  76.     } while (strnicmp(message, "quit", message_len) != 0);
  77. }
  78.  
  79. void client_thread(void* my_id)
  80. {
  81.     ostringstream fn_id;
  82.     for (int i=0; i < (int) num_operations; ++i) {
  83.         const int linger = 0;
  84.         void* router_command_socket;
  85.         CHECKNOTNULL(router_command_socket = zmq_socket(context, ZMQ_REQ));
  86.         CHECK(zmq_setsockopt(router_command_socket, ZMQ_LINGER, &linger, sizeof(int)));
  87.         CHECK(zmq_connect(router_command_socket, ROUTER_ENDPOINT));
  88.         CHECKPOS(zmq_send(router_command_socket, "hey", 3, 0));
  89.  
  90.         char response[100];
  91.         int response_len;
  92.         CHECKPOS(response_len = zmq_recv(router_command_socket, response, 100, 0));
  93.         CHECK(zmq_close(router_command_socket));
  94.     }
  95. }
  96.  
  97. int main(int argc, char* argv[])
  98. {
  99.     // if num_operations*num_clients > 1000, this will fail
  100.     num_operations = 100;
  101.     const int num_clients = 15;
  102.     const int linger = 0;
  103.     context = zmq_ctx_new();
  104.     //CHECK(zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1023));
  105.  
  106.     // create router socket
  107.     void* router_socket = zmq_socket(context, ZMQ_ROUTER);
  108.     CHECK(zmq_setsockopt(router_socket, ZMQ_LINGER, &linger, sizeof(int)));
  109.     CHECK(zmq_bind(router_socket, ROUTER_ENDPOINT));
  110.     zmq_threadstart(router_thread, router_socket);
  111.     cout << "starting router" << endl;
  112.  
  113.     // start up clients
  114.     for (int i=0; i < num_clients; ++i)
  115.         zmq_threadstart(client_thread, (void*) i);
  116.  
  117.     cout << "press enter to stop router" << endl;
  118.     getchar();
  119.  
  120.     // stop the router
  121.     cout << "stopping router" << endl;
  122.     void* router_command_socket = zmq_socket(context, ZMQ_REQ);
  123.     CHECK(zmq_setsockopt(router_command_socket, ZMQ_LINGER, &linger, sizeof(int)));
  124.     CHECK(zmq_connect(router_command_socket, ROUTER_ENDPOINT));
  125.     CHECKPOS(zmq_send(router_command_socket, "quit", 4, 0));
  126.  
  127.     char response[100];
  128.     int response_len;
  129.     CHECKPOS(response_len = zmq_recv(router_command_socket, response, 100, 0));
  130.     cout << string(response, response_len) << endl;
  131.     CHECK(zmq_close(router_command_socket));
  132.  
  133.     CHECK(zmq_close(router_socket));
  134.     CHECK(zmq_ctx_destroy(context));
  135.  
  136.     return 0;
  137. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement