Advertisement
nitro2005

zeromq_srv_multi.cpp

Oct 31st, 2014
11,663
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.74 KB | None | 0 0
  1. #include "zmq.h"
  2. #include <time.h>
  3. #include <signal.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <unistd.h>
  7. #include <math.h>
  8. #include <pthread.h>
  9. #include <sys/wait.h>
  10.  
  11. #define SOCKET_STRING                 "tcp://127.0.0.1:1102"
  12. #define INTERNAL_WORKER_ADDRESS       "inproc://workers"
  13. #define PACKET_SIZE                   1024
  14. #define OUTPUT_BUFFER_SIZE            PACKET_SIZE*5
  15. #define MAX_CLIENTS                   10
  16. #define TEST_DURATION                 10
  17. #define WORKER_THREAD_COUNT           2
  18. #define WORKER_WORK_TIME              2000
  19.  
  20. struct ThreadStats
  21. {
  22.   unsigned long long packets_received;
  23.   unsigned long long bytes_received;
  24. };
  25.  
  26. bool flag_kill;
  27. void *z_ctx;
  28.  
  29. //---------------------------------------------------------------------------
  30. static void *client_thread(void *)
  31. {
  32.   void *sock = zmq_socket(z_ctx, ZMQ_DEALER);
  33.   if (!sock)
  34.     return NULL;
  35.  
  36.   if (zmq_connect(sock, SOCKET_STRING) < 0)
  37.   {
  38.     zmq_close(sock);
  39.     return NULL;
  40.   }
  41.  
  42.   char output_buffer[OUTPUT_BUFFER_SIZE];
  43.   int output_buffer_len = 0;
  44.  
  45.   // отправляем на сервер несколько пакетов
  46.   while (output_buffer_len+PACKET_SIZE <= sizeof(output_buffer))
  47.   {
  48.     int bytes_sent = zmq_send(sock, output_buffer+output_buffer_len, PACKET_SIZE, ZMQ_DONTWAIT);
  49.     output_buffer_len += bytes_sent;
  50.   }
  51.  
  52.   ThreadStats *stats = new ThreadStats;
  53.   stats->packets_received = 0;
  54.   stats->bytes_received = 0;
  55.  
  56.   // начинаем мониторить события сокета
  57.   zmq_pollitem_t poll_fd;
  58.   poll_fd.socket = sock;
  59.   poll_fd.events = ZMQ_POLLIN;
  60.   poll_fd.revents = 0;
  61.  
  62.   while(!flag_kill)
  63.   {
  64.     int res = zmq_poll(&poll_fd, 1, 500);
  65.     if (res < 0)
  66.       break;
  67.     if (res == 0)
  68.       continue;
  69.  
  70.     // есть новый пакет
  71.     if (poll_fd.revents & ZMQ_POLLIN)
  72.     {
  73.       poll_fd.revents &= ~ZMQ_POLLIN;
  74.  
  75.       // читаем пакет с данными
  76.       zmq_msg_t rcv_msg;
  77.       if (zmq_msg_init(&rcv_msg) < 0)
  78.       {
  79.         flag_kill = true;
  80.         break;
  81.       }
  82.  
  83.       int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
  84.       if (rc < 0)
  85.       {
  86.         flag_kill = true;
  87.         break;
  88.       }
  89.  
  90.       int msg_size = zmq_msg_size(&rcv_msg);
  91.       char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
  92.       stats->bytes_received += msg_size;
  93.       stats->packets_received++;
  94.  
  95.       // отправляем пакет с данными
  96.       if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
  97.       {
  98.         printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
  99.         break;
  100.       }
  101.  
  102.       zmq_msg_close(&rcv_msg);
  103.     }
  104.   }
  105.  
  106.   zmq_close(sock);
  107.   return stats;
  108. }
  109.  
  110. //---------------------------------------------------------------------------
  111. static void *worker_thread(void *)
  112. {
  113.   void *sock = zmq_socket(z_ctx, ZMQ_REP);
  114.   if (!sock)
  115.     return NULL;
  116.  
  117.   if (zmq_connect(sock, INTERNAL_WORKER_ADDRESS) < 0)
  118.   {
  119.     zmq_close(sock);
  120.     return NULL;
  121.   }
  122.  
  123.   // начинаем мониторить события сокета
  124.   zmq_pollitem_t poll_fd;
  125.   poll_fd.socket = sock;
  126.   poll_fd.events = ZMQ_POLLIN;
  127.   poll_fd.revents = 0;
  128.  
  129.   while(!flag_kill)
  130.   {
  131.     int res = zmq_poll(&poll_fd, 1, 500);
  132.     if (res < 0)
  133.       break;
  134.     if (res == 0)
  135.       continue;
  136.  
  137.     // есть новый пакет
  138.     if (poll_fd.revents & ZMQ_POLLIN)
  139.     {
  140.       poll_fd.revents &= ~ZMQ_POLLIN;
  141.  
  142.       // читаем фрейм с идентификатором клиента
  143.       char client_identity[255];
  144.       int client_identity_len;
  145.       client_identity_len = zmq_recv(sock, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  146.       if (client_identity_len <= 0)
  147.         break;
  148.  
  149.       // читаем пакет с данными
  150.       zmq_msg_t rcv_msg;
  151.       if (zmq_msg_init(&rcv_msg) < 0)
  152.       {
  153.         flag_kill = true;
  154.         break;
  155.       }
  156.  
  157.       int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
  158.       if (rc < 0)
  159.       {
  160.         flag_kill = true;
  161.         break;
  162.       }
  163.  
  164.       int msg_size = zmq_msg_size(&rcv_msg);
  165.       char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
  166.  
  167.       // имитируем обработку запроса (2ms)
  168.       usleep(WORKER_WORK_TIME);
  169.  
  170.       // отправляем фрейм с идентификатором клиента
  171.       if (zmq_send(sock, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  172.         break;
  173.  
  174.       // отправляем пакет с данными обратно
  175.       if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
  176.       {
  177.         printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
  178.         break;
  179.       }
  180.  
  181.       zmq_msg_close(&rcv_msg);
  182.     }
  183.   }
  184.  
  185.   zmq_close(sock);
  186.   return NULL;
  187. }
  188.  
  189. //---------------------------------------------------------------------------
  190. void termination_handler(int)
  191. {
  192.   flag_kill = true;
  193. }
  194. //---------------------------------------------------------------------------
  195. int main(int argc, char *argv[])
  196. {
  197.   signal(SIGTERM, termination_handler);
  198.   signal(SIGSTOP, termination_handler);
  199.   signal(SIGINT,  termination_handler);
  200.   signal(SIGQUIT, termination_handler);
  201.  
  202.   z_ctx = zmq_ctx_new();
  203.   if (!z_ctx)
  204.     return 1;
  205.  
  206. //  zmq_ctx_set(z_ctx, ZMQ_IO_THREADS, 2);
  207.  
  208.   // стартуем сокет для обслуживания клиентов
  209.   void *sock_srv = zmq_socket(z_ctx, ZMQ_ROUTER);
  210.   if (!sock_srv)
  211.   {
  212.     zmq_ctx_destroy(z_ctx);
  213.     return 2;
  214.   }
  215.   if (zmq_bind(sock_srv, SOCKET_STRING) < 0)
  216.   {
  217.     zmq_close(sock_srv);
  218.     zmq_ctx_destroy(z_ctx);
  219.     return 3;
  220.   }
  221.  
  222.   // стартуем сокет для обслуживания потоков worker-ов
  223.   void *sock_workers = zmq_socket(z_ctx, ZMQ_DEALER);
  224.   if (!sock_workers)
  225.   {
  226.     zmq_close(sock_srv);
  227.     zmq_ctx_destroy(z_ctx);
  228.     return 2;
  229.   }
  230.   if (zmq_bind(sock_workers, INTERNAL_WORKER_ADDRESS) < 0)
  231.   {
  232.     zmq_close(sock_srv);
  233.     zmq_close(sock_workers);
  234.     zmq_ctx_destroy(z_ctx);
  235.     return 3;
  236.   }
  237.  
  238.   // запускаем потоки worker-ов
  239.   pthread_t worker_thread_ids[WORKER_THREAD_COUNT];
  240.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  241.   {
  242.     pthread_create(&worker_thread_ids[i], NULL, &worker_thread, NULL);
  243.   }
  244.  
  245.   // запускаем потоки клиентов
  246.   pthread_t thread_ids[MAX_CLIENTS];
  247.   for (int i=0; i < MAX_CLIENTS; i++)
  248.   {
  249.     pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
  250.   }
  251.  
  252.   // начинаем мониторить события сокета
  253.   zmq_pollitem_t poll_fds[2];
  254.   poll_fds[0].socket = sock_srv;
  255.   poll_fds[0].events = ZMQ_POLLIN;
  256.   poll_fds[0].revents = 0;
  257.   poll_fds[1].socket = sock_workers;
  258.   poll_fds[1].events = ZMQ_POLLIN;
  259.   poll_fds[1].revents = 0;
  260.  
  261.   struct timespec ts_start;
  262.   struct timespec ts_current;
  263.   clock_gettime(CLOCK_MONOTONIC, &ts_start);
  264.   double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  265.   double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  266.   zmq_msg_t rcv_msg;
  267.   while(!flag_kill)
  268.   {
  269.     clock_gettime(CLOCK_MONOTONIC, &ts_current);
  270.     cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
  271.     if (cur_time-start_time > TEST_DURATION)
  272.     {
  273.       flag_kill = true;
  274.       break;
  275.     }
  276.     int res = zmq_poll(poll_fds, 2, 500);
  277.     if (res < 0)
  278.       break;
  279.     if (res == 0)
  280.       continue;
  281.  
  282.     // есть новый пакет от клиента
  283.     if (poll_fds[0].revents & ZMQ_POLLIN)
  284.     {
  285.       poll_fds[0].revents &= ~ZMQ_POLLIN;
  286.  
  287.       // читаем фрейм с идентификатором клиента
  288.       char client_identity[255];
  289.       int client_identity_len;
  290.       client_identity_len = zmq_recv(sock_srv, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  291.       if (client_identity_len <= 0)
  292.         break;
  293.  
  294.       // читаем фрейм с пакетом данных
  295.       if (zmq_msg_init(&rcv_msg) < 0)
  296.       {
  297.         flag_kill = true;
  298.         break;
  299.       }
  300.       int rc = zmq_msg_recv(&rcv_msg, sock_srv, ZMQ_DONTWAIT);
  301.       if (rc < 0)
  302.       {
  303.         flag_kill = true;
  304.         break;
  305.       }
  306.       int msg_size = zmq_msg_size(&rcv_msg);
  307.  
  308.       // отправляем delimiter-фрейм worker-у
  309.       if (zmq_send(sock_workers, "", 0, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  310.         break;
  311.  
  312.       // отправляем фрейм с идентификатором клиента worker-у
  313.       // worker нам его вернет обратно вместе с ответом на запрос
  314.       // и мы будем знать какому клиенту направить ответ
  315.       if (zmq_send(sock_workers, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  316.         break;
  317.  
  318.       // отправляем фрейм с пакетом данных worker-у
  319.       if (zmq_send(sock_workers, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
  320.         break;
  321.  
  322.       zmq_msg_close(&rcv_msg);
  323.     }
  324.  
  325.     // есть новый пакет от worker-а
  326.     if (poll_fds[1].revents & ZMQ_POLLIN)
  327.     {
  328.       poll_fds[1].revents &= ~ZMQ_POLLIN;
  329.  
  330.       // читаем delimiter-фрейм
  331.       char delimiter_buf[2];
  332.       int delimiter_buf_len;
  333.       delimiter_buf_len = zmq_recv(sock_workers, delimiter_buf, sizeof(delimiter_buf), ZMQ_DONTWAIT);
  334.       if (delimiter_buf_len > 0)
  335.         break;
  336.  
  337.       // читаем фрейм с идентификатором клиента
  338.       char client_identity[255];
  339.       int client_identity_len;
  340.       client_identity_len = zmq_recv(sock_workers, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  341.       if (client_identity_len <= 0)
  342.         break;
  343.  
  344.       // читаем фрейм с пакетом данных
  345.       if (zmq_msg_init(&rcv_msg) < 0)
  346.       {
  347.         flag_kill = true;
  348.         break;
  349.       }
  350.       int rc = zmq_msg_recv(&rcv_msg, sock_workers, ZMQ_DONTWAIT);
  351.       if (rc < 0)
  352.       {
  353.         flag_kill = true;
  354.         break;
  355.       }
  356.       int msg_size = zmq_msg_size(&rcv_msg);
  357.  
  358.       // отправляем фрейм с идентификатором клиента ROUTER-сокету
  359.       if (zmq_send(sock_srv, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  360.         break;
  361.  
  362.       // отправляем фрейм с пакетом данных клиенту
  363.       if (zmq_send(sock_srv, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
  364.         break;
  365.  
  366.       zmq_msg_close(&rcv_msg);
  367.     }
  368.   }
  369.   double elapsed = cur_time-start_time;
  370.  
  371.   // ждем завершения потоков
  372.   usleep(600000);
  373.  
  374.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  375.   {
  376.     void *res = NULL;
  377.     pthread_join(worker_thread_ids[i], &res);
  378.   }
  379.  
  380.   unsigned long long total_packets_received = 0;
  381.   unsigned long long total_bytes_received = 0;
  382.   // получаем статистику по потокам и считаем итог
  383.   printf("thread id:\tbytes rcv\tpackets rcv\r\n");
  384.   for (int i=0; i < MAX_CLIENTS; i++)
  385.   {
  386.     void *res = NULL;
  387.     pthread_join(thread_ids[i], &res);
  388.     ThreadStats *stat = (ThreadStats *)res;
  389.     if (stat)
  390.     {
  391.       printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
  392.       total_bytes_received += stat->bytes_received;
  393.       total_packets_received += stat->packets_received;
  394.       delete stat;
  395.     }
  396.   }
  397.   printf("    TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
  398.   printf("\r\n");
  399.   printf("Elapsed time: %.3lf\r\n", elapsed);
  400.   printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
  401.  
  402.   zmq_close(sock_workers);
  403.   zmq_close(sock_srv);
  404.   zmq_ctx_destroy(z_ctx);
  405.   return 0;
  406. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement