nitro2005

zeromq_srv_multi.cpp

Oct 31st, 2014
9,827
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include "zmq.h"
  2. #include <time.h>
  3. #include <signal.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <unistd.h>
  7. #include <math.h>
  8. #include <pthread.h>
  9. #include <sys/wait.h>
  10.  
  11. #define SOCKET_STRING                 "tcp://127.0.0.1:1102"
  12. #define INTERNAL_WORKER_ADDRESS       "inproc://workers"
  13. #define PACKET_SIZE                   1024
  14. #define OUTPUT_BUFFER_SIZE            PACKET_SIZE*5
  15. #define MAX_CLIENTS                   10
  16. #define TEST_DURATION                 10
  17. #define WORKER_THREAD_COUNT           2
  18. #define WORKER_WORK_TIME              2000
  19.  
  20. struct ThreadStats
  21. {
  22.   unsigned long long packets_received;
  23.   unsigned long long bytes_received;
  24. };
  25.  
  26. bool flag_kill;
  27. void *z_ctx;
  28.  
  29. //---------------------------------------------------------------------------
  30. static void *client_thread(void *)
  31. {
  32.   void *sock = zmq_socket(z_ctx, ZMQ_DEALER);
  33.   if (!sock)
  34.     return NULL;
  35.  
  36.   if (zmq_connect(sock, SOCKET_STRING) < 0)
  37.   {
  38.     zmq_close(sock);
  39.     return NULL;
  40.   }
  41.  
  42.   char output_buffer[OUTPUT_BUFFER_SIZE];
  43.   int output_buffer_len = 0;
  44.  
  45.   // отправляем на сервер несколько пакетов
  46.   while (output_buffer_len+PACKET_SIZE <= sizeof(output_buffer))
  47.   {
  48.     int bytes_sent = zmq_send(sock, output_buffer+output_buffer_len, PACKET_SIZE, ZMQ_DONTWAIT);
  49.     output_buffer_len += bytes_sent;
  50.   }
  51.  
  52.   ThreadStats *stats = new ThreadStats;
  53.   stats->packets_received = 0;
  54.   stats->bytes_received = 0;
  55.  
  56.   // начинаем мониторить события сокета
  57.   zmq_pollitem_t poll_fd;
  58.   poll_fd.socket = sock;
  59.   poll_fd.events = ZMQ_POLLIN;
  60.   poll_fd.revents = 0;
  61.  
  62.   while(!flag_kill)
  63.   {
  64.     int res = zmq_poll(&poll_fd, 1, 500);
  65.     if (res < 0)
  66.       break;
  67.     if (res == 0)
  68.       continue;
  69.  
  70.     // есть новый пакет
  71.     if (poll_fd.revents & ZMQ_POLLIN)
  72.     {
  73.       poll_fd.revents &= ~ZMQ_POLLIN;
  74.  
  75.       // читаем пакет с данными
  76.       zmq_msg_t rcv_msg;
  77.       if (zmq_msg_init(&rcv_msg) < 0)
  78.       {
  79.         flag_kill = true;
  80.         break;
  81.       }
  82.  
  83.       int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
  84.       if (rc < 0)
  85.       {
  86.         flag_kill = true;
  87.         break;
  88.       }
  89.  
  90.       int msg_size = zmq_msg_size(&rcv_msg);
  91.       char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
  92.       stats->bytes_received += msg_size;
  93.       stats->packets_received++;
  94.  
  95.       // отправляем пакет с данными
  96.       if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
  97.       {
  98.         printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
  99.         break;
  100.       }
  101.  
  102.       zmq_msg_close(&rcv_msg);
  103.     }
  104.   }
  105.  
  106.   zmq_close(sock);
  107.   return stats;
  108. }
  109.  
  110. //---------------------------------------------------------------------------
  111. static void *worker_thread(void *)
  112. {
  113.   void *sock = zmq_socket(z_ctx, ZMQ_REP);
  114.   if (!sock)
  115.     return NULL;
  116.  
  117.   if (zmq_connect(sock, INTERNAL_WORKER_ADDRESS) < 0)
  118.   {
  119.     zmq_close(sock);
  120.     return NULL;
  121.   }
  122.  
  123.   // начинаем мониторить события сокета
  124.   zmq_pollitem_t poll_fd;
  125.   poll_fd.socket = sock;
  126.   poll_fd.events = ZMQ_POLLIN;
  127.   poll_fd.revents = 0;
  128.  
  129.   while(!flag_kill)
  130.   {
  131.     int res = zmq_poll(&poll_fd, 1, 500);
  132.     if (res < 0)
  133.       break;
  134.     if (res == 0)
  135.       continue;
  136.  
  137.     // есть новый пакет
  138.     if (poll_fd.revents & ZMQ_POLLIN)
  139.     {
  140.       poll_fd.revents &= ~ZMQ_POLLIN;
  141.  
  142.       // читаем фрейм с идентификатором клиента
  143.       char client_identity[255];
  144.       int client_identity_len;
  145.       client_identity_len = zmq_recv(sock, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  146.       if (client_identity_len <= 0)
  147.         break;
  148.  
  149.       // читаем пакет с данными
  150.       zmq_msg_t rcv_msg;
  151.       if (zmq_msg_init(&rcv_msg) < 0)
  152.       {
  153.         flag_kill = true;
  154.         break;
  155.       }
  156.  
  157.       int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
  158.       if (rc < 0)
  159.       {
  160.         flag_kill = true;
  161.         break;
  162.       }
  163.  
  164.       int msg_size = zmq_msg_size(&rcv_msg);
  165.       char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
  166.  
  167.       // имитируем обработку запроса (2ms)
  168.       usleep(WORKER_WORK_TIME);
  169.  
  170.       // отправляем фрейм с идентификатором клиента
  171.       if (zmq_send(sock, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  172.         break;
  173.  
  174.       // отправляем пакет с данными обратно
  175.       if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
  176.       {
  177.         printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
  178.         break;
  179.       }
  180.  
  181.       zmq_msg_close(&rcv_msg);
  182.     }
  183.   }
  184.  
  185.   zmq_close(sock);
  186.   return NULL;
  187. }
  188.  
  189. //---------------------------------------------------------------------------
  190. void termination_handler(int)
  191. {
  192.   flag_kill = true;
  193. }
  194. //---------------------------------------------------------------------------
  195. int main(int argc, char *argv[])
  196. {
  197.   signal(SIGTERM, termination_handler);
  198.   signal(SIGSTOP, termination_handler);
  199.   signal(SIGINT,  termination_handler);
  200.   signal(SIGQUIT, termination_handler);
  201.  
  202.   z_ctx = zmq_ctx_new();
  203.   if (!z_ctx)
  204.     return 1;
  205.  
  206. //  zmq_ctx_set(z_ctx, ZMQ_IO_THREADS, 2);
  207.  
  208.   // стартуем сокет для обслуживания клиентов
  209.   void *sock_srv = zmq_socket(z_ctx, ZMQ_ROUTER);
  210.   if (!sock_srv)
  211.   {
  212.     zmq_ctx_destroy(z_ctx);
  213.     return 2;
  214.   }
  215.   if (zmq_bind(sock_srv, SOCKET_STRING) < 0)
  216.   {
  217.     zmq_close(sock_srv);
  218.     zmq_ctx_destroy(z_ctx);
  219.     return 3;
  220.   }
  221.  
  222.   // стартуем сокет для обслуживания потоков worker-ов
  223.   void *sock_workers = zmq_socket(z_ctx, ZMQ_DEALER);
  224.   if (!sock_workers)
  225.   {
  226.     zmq_close(sock_srv);
  227.     zmq_ctx_destroy(z_ctx);
  228.     return 2;
  229.   }
  230.   if (zmq_bind(sock_workers, INTERNAL_WORKER_ADDRESS) < 0)
  231.   {
  232.     zmq_close(sock_srv);
  233.     zmq_close(sock_workers);
  234.     zmq_ctx_destroy(z_ctx);
  235.     return 3;
  236.   }
  237.  
  238.   // запускаем потоки worker-ов
  239.   pthread_t worker_thread_ids[WORKER_THREAD_COUNT];
  240.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  241.   {
  242.     pthread_create(&worker_thread_ids[i], NULL, &worker_thread, NULL);
  243.   }
  244.  
  245.   // запускаем потоки клиентов
  246.   pthread_t thread_ids[MAX_CLIENTS];
  247.   for (int i=0; i < MAX_CLIENTS; i++)
  248.   {
  249.     pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
  250.   }
  251.  
  252.   // начинаем мониторить события сокета
  253.   zmq_pollitem_t poll_fds[2];
  254.   poll_fds[0].socket = sock_srv;
  255.   poll_fds[0].events = ZMQ_POLLIN;
  256.   poll_fds[0].revents = 0;
  257.   poll_fds[1].socket = sock_workers;
  258.   poll_fds[1].events = ZMQ_POLLIN;
  259.   poll_fds[1].revents = 0;
  260.  
  261.   struct timespec ts_start;
  262.   struct timespec ts_current;
  263.   clock_gettime(CLOCK_MONOTONIC, &ts_start);
  264.   double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  265.   double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  266.   zmq_msg_t rcv_msg;
  267.   while(!flag_kill)
  268.   {
  269.     clock_gettime(CLOCK_MONOTONIC, &ts_current);
  270.     cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
  271.     if (cur_time-start_time > TEST_DURATION)
  272.     {
  273.       flag_kill = true;
  274.       break;
  275.     }
  276.     int res = zmq_poll(poll_fds, 2, 500);
  277.     if (res < 0)
  278.       break;
  279.     if (res == 0)
  280.       continue;
  281.  
  282.     // есть новый пакет от клиента
  283.     if (poll_fds[0].revents & ZMQ_POLLIN)
  284.     {
  285.       poll_fds[0].revents &= ~ZMQ_POLLIN;
  286.  
  287.       // читаем фрейм с идентификатором клиента
  288.       char client_identity[255];
  289.       int client_identity_len;
  290.       client_identity_len = zmq_recv(sock_srv, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  291.       if (client_identity_len <= 0)
  292.         break;
  293.  
  294.       // читаем фрейм с пакетом данных
  295.       if (zmq_msg_init(&rcv_msg) < 0)
  296.       {
  297.         flag_kill = true;
  298.         break;
  299.       }
  300.       int rc = zmq_msg_recv(&rcv_msg, sock_srv, ZMQ_DONTWAIT);
  301.       if (rc < 0)
  302.       {
  303.         flag_kill = true;
  304.         break;
  305.       }
  306.       int msg_size = zmq_msg_size(&rcv_msg);
  307.  
  308.       // отправляем delimiter-фрейм worker-у
  309.       if (zmq_send(sock_workers, "", 0, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  310.         break;
  311.  
  312.       // отправляем фрейм с идентификатором клиента worker-у
  313.       // worker нам его вернет обратно вместе с ответом на запрос
  314.       // и мы будем знать какому клиенту направить ответ
  315.       if (zmq_send(sock_workers, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  316.         break;
  317.  
  318.       // отправляем фрейм с пакетом данных worker-у
  319.       if (zmq_send(sock_workers, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
  320.         break;
  321.  
  322.       zmq_msg_close(&rcv_msg);
  323.     }
  324.  
  325.     // есть новый пакет от worker-а
  326.     if (poll_fds[1].revents & ZMQ_POLLIN)
  327.     {
  328.       poll_fds[1].revents &= ~ZMQ_POLLIN;
  329.  
  330.       // читаем delimiter-фрейм
  331.       char delimiter_buf[2];
  332.       int delimiter_buf_len;
  333.       delimiter_buf_len = zmq_recv(sock_workers, delimiter_buf, sizeof(delimiter_buf), ZMQ_DONTWAIT);
  334.       if (delimiter_buf_len > 0)
  335.         break;
  336.  
  337.       // читаем фрейм с идентификатором клиента
  338.       char client_identity[255];
  339.       int client_identity_len;
  340.       client_identity_len = zmq_recv(sock_workers, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
  341.       if (client_identity_len <= 0)
  342.         break;
  343.  
  344.       // читаем фрейм с пакетом данных
  345.       if (zmq_msg_init(&rcv_msg) < 0)
  346.       {
  347.         flag_kill = true;
  348.         break;
  349.       }
  350.       int rc = zmq_msg_recv(&rcv_msg, sock_workers, ZMQ_DONTWAIT);
  351.       if (rc < 0)
  352.       {
  353.         flag_kill = true;
  354.         break;
  355.       }
  356.       int msg_size = zmq_msg_size(&rcv_msg);
  357.  
  358.       // отправляем фрейм с идентификатором клиента ROUTER-сокету
  359.       if (zmq_send(sock_srv, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
  360.         break;
  361.  
  362.       // отправляем фрейм с пакетом данных клиенту
  363.       if (zmq_send(sock_srv, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
  364.         break;
  365.  
  366.       zmq_msg_close(&rcv_msg);
  367.     }
  368.   }
  369.   double elapsed = cur_time-start_time;
  370.  
  371.   // ждем завершения потоков
  372.   usleep(600000);
  373.  
  374.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  375.   {
  376.     void *res = NULL;
  377.     pthread_join(worker_thread_ids[i], &res);
  378.   }
  379.  
  380.   unsigned long long total_packets_received = 0;
  381.   unsigned long long total_bytes_received = 0;
  382.   // получаем статистику по потокам и считаем итог
  383.   printf("thread id:\tbytes rcv\tpackets rcv\r\n");
  384.   for (int i=0; i < MAX_CLIENTS; i++)
  385.   {
  386.     void *res = NULL;
  387.     pthread_join(thread_ids[i], &res);
  388.     ThreadStats *stat = (ThreadStats *)res;
  389.     if (stat)
  390.     {
  391.       printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
  392.       total_bytes_received += stat->bytes_received;
  393.       total_packets_received += stat->packets_received;
  394.       delete stat;
  395.     }
  396.   }
  397.   printf("    TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
  398.   printf("\r\n");
  399.   printf("Elapsed time: %.3lf\r\n", elapsed);
  400.   printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
  401.  
  402.   zmq_close(sock_workers);
  403.   zmq_close(sock_srv);
  404.   zmq_ctx_destroy(z_ctx);
  405.   return 0;
  406. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×