Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * Author: Mark Wright (markscottwright@gmail.com)
- * Creation Date: 2014-11-25
- *
- *
- * Built with:
- *
- @echo off
- setlocal
- @call "c:\Program Files (x86)\Microsoft Visual Studio 10.0\VC\bin\vcvars32.bat"
- if not exist obj mkdir obj
- if not exist bin mkdir bin
- cl zmqclose.cpp /I "C:\tools\ZeroMQ 4.0.4\include" /nologo /EHsc /Foobj\ /Febin\ ^
- /nologo /EHsc "C:\tools\ZeroMQ 4.0.4\lib\libzmq-v110-mt-4_0_4.lib"
- */
- #include <iostream>
- #include <string>
- #include <sstream>
- #include <iomanip>
- #include <zmq.h>
- #include <zmq_utils.h>
- using namespace std;
- #define ROUTER_ENDPOINT "inproc://router"
- #define CHECK(x) \
- if ((x) != 0) dump_error(__LINE__, #x);
- #define CHECKNOTNULL(x) \
- if ((x) == NULL) dump_error(__LINE__, #x);
- #define CHECKPOS(x) \
- if ((x) < 0) dump_error(__LINE__, #x);
- void* context;
- int num_operations;
- void dump_error(int line_number, const char* cmd)
- {
- cout << cmd << " failed at line " << line_number << endl;
- cout << "error = " << strerror(errno) << endl;
- cout << "error = " << errno << endl;
- exit(0);
- }
- string hexstring(const char* bytes, int len)
- {
- ostringstream hexstr;
- for (int i=0; i < len; ++i)
- hexstr << hex << setfill('0') << setw(2) << (int) bytes[i];
- return hexstr.str();
- }
- void router_thread(void* router_socket)
- {
- char client_id[100], null[100], message[100];
- int client_id_len, null_len, message_len;
- int more;
- size_t more_len = sizeof(more);
- int ops_processed = 0;
- do {
- CHECKPOS(client_id_len = zmq_recv(router_socket, client_id, 100, 0));
- CHECK(zmq_getsockopt(router_socket, ZMQ_RCVMORE, &more, &more_len));
- CHECKPOS(null_len = zmq_recv(router_socket, null, 100, 0));
- CHECK(zmq_getsockopt(router_socket, ZMQ_RCVMORE, &more, &more_len));
- CHECKPOS(message_len = zmq_recv(router_socket, message, 100, 0));
- CHECKPOS(zmq_send(router_socket, client_id, client_id_len, ZMQ_SNDMORE));
- CHECKPOS(zmq_send(router_socket, "", 0, ZMQ_SNDMORE));
- ostringstream rsp_builder;
- rsp_builder << "ok " << ops_processed;
- string rsp = rsp_builder.str();
- CHECKPOS(zmq_send(router_socket, rsp.c_str(), rsp.size(), 0));
- ops_processed++;
- } while (strnicmp(message, "quit", message_len) != 0);
- }
- void client_thread(void* my_id)
- {
- ostringstream fn_id;
- for (int i=0; i < (int) num_operations; ++i) {
- const int linger = 0;
- void* router_command_socket;
- CHECKNOTNULL(router_command_socket = zmq_socket(context, ZMQ_REQ));
- CHECK(zmq_setsockopt(router_command_socket, ZMQ_LINGER, &linger, sizeof(int)));
- CHECK(zmq_connect(router_command_socket, ROUTER_ENDPOINT));
- CHECKPOS(zmq_send(router_command_socket, "hey", 3, 0));
- char response[100];
- int response_len;
- CHECKPOS(response_len = zmq_recv(router_command_socket, response, 100, 0));
- CHECK(zmq_close(router_command_socket));
- }
- }
- int main(int argc, char* argv[])
- {
- // if num_operations*num_clients > 1000, this will fail
- num_operations = 100;
- const int num_clients = 15;
- const int linger = 0;
- context = zmq_ctx_new();
- //CHECK(zmq_ctx_set(context, ZMQ_MAX_SOCKETS, 1023));
- // create router socket
- void* router_socket = zmq_socket(context, ZMQ_ROUTER);
- CHECK(zmq_setsockopt(router_socket, ZMQ_LINGER, &linger, sizeof(int)));
- CHECK(zmq_bind(router_socket, ROUTER_ENDPOINT));
- zmq_threadstart(router_thread, router_socket);
- cout << "starting router" << endl;
- // start up clients
- for (int i=0; i < num_clients; ++i)
- zmq_threadstart(client_thread, (void*) i);
- cout << "press enter to stop router" << endl;
- getchar();
- // stop the router
- cout << "stopping router" << endl;
- void* router_command_socket = zmq_socket(context, ZMQ_REQ);
- CHECK(zmq_setsockopt(router_command_socket, ZMQ_LINGER, &linger, sizeof(int)));
- CHECK(zmq_connect(router_command_socket, ROUTER_ENDPOINT));
- CHECKPOS(zmq_send(router_command_socket, "quit", 4, 0));
- char response[100];
- int response_len;
- CHECKPOS(response_len = zmq_recv(router_command_socket, response, 100, 0));
- cout << string(response, response_len) << endl;
- CHECK(zmq_close(router_command_socket));
- CHECK(zmq_close(router_socket));
- CHECK(zmq_ctx_destroy(context));
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement