Advertisement
nitro2005

socket_srv_multi.cpp

Oct 31st, 2014
8,171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 19.00 KB | None | 0 0
  1. #include <unistd.h>
  2. #include <sys/types.h>
  3. #include <sys/wait.h>
  4. #include <signal.h>
  5. #include <stdio.h>
  6. #include <math.h>
  7. #include <string.h>
  8. #include <stdlib.h>
  9. #include <time.h>
  10. #include <malloc.h>
  11. #include <fcntl.h>
  12. #include <syslog.h>
  13. #include <sys/socket.h>
  14. #include <netinet/in.h>
  15. #include <arpa/inet.h>
  16. #include <netdb.h>
  17. #include <pthread.h>
  18. #include <sys/poll.h>
  19.  
  20. #define SOCKET_TCP_PORT               1101
  21. #define PACKET_SIZE                   1024
  22. #define INPUT_BUFFER_SIZE             (PACKET_SIZE+sizeof(int))*5
  23. #define OUTPUT_BUFFER_SIZE            (PACKET_SIZE+sizeof(int))*5
  24. #define MAX_CLIENTS                   10
  25. #define TEST_DURATION                 10
  26. #define WORKER_THREAD_COUNT           2
  27. #define WORKER_WORK_TIME              2000
  28. #define WORKER_SLEEP_TIME             1
  29.  
  30. struct SocketClient
  31. {
  32.   char input_buffer[INPUT_BUFFER_SIZE];
  33.   char output_buffer[OUTPUT_BUFFER_SIZE];
  34.   int input_buffer_len;
  35.   int output_buffer_len;
  36.   int fd;
  37.   int poll_fd_index;
  38. };
  39. struct ThreadStats
  40. {
  41.   unsigned long long packets_received;
  42.   unsigned long long bytes_received;
  43. };
  44.  
  45. struct WorkerThreadData
  46. {
  47.   pthread_t thread_id;
  48.   pthread_mutex_t mutex;
  49.   char packet_buffer[INPUT_BUFFER_SIZE];
  50.   int packet_buffer_len;
  51.   int client_index;
  52. };
  53.  
  54. struct SocketClient clients[MAX_CLIENTS];
  55. pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
  56. WorkerThreadData worker_threads[WORKER_THREAD_COUNT];
  57. bool flag_kill;
  58.  
  59. //---------------------------------------------------------------------------
  60. bool setnonblocking(int fd)
  61. {
  62.   int opts = fcntl(fd, F_GETFL);
  63.   if (opts < 0)
  64.     return false;
  65.   opts = (opts | O_NONBLOCK);
  66.   if (fcntl(fd, F_SETFL, opts) < 0)
  67.     return false;
  68.   return true;
  69. }
  70.  
  71. //---------------------------------------------------------------------------
  72. static void *client_thread(void *)
  73. {
  74.   // устанавливаем соединение с сервером
  75.   struct hostent *host_info = gethostbyname("127.0.0.1");
  76.   if (!host_info)
  77.     return NULL;
  78.   int fd = socket(AF_INET, SOCK_STREAM, 0);
  79.   if (fd < 0)
  80.     return NULL;
  81.   struct sockaddr_in addr;
  82.   memset((char *) &addr, 0, sizeof(addr));
  83.   addr.sin_family = host_info->h_addrtype;
  84.   addr.sin_port = htons(SOCKET_TCP_PORT);
  85.   addr.sin_addr = *(struct in_addr*)host_info->h_addr;
  86.   if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
  87.   {
  88.     close(fd);
  89.     return NULL;
  90.   }
  91.   if (!setnonblocking(fd))
  92.   {
  93.     close(fd);
  94.     return NULL;
  95.   }
  96.  
  97.   // начинаем мониторить события сокета
  98.   struct pollfd poll_fd;
  99.   memset(&poll_fd, 0, sizeof(poll_fd));
  100.   poll_fd.fd = fd;
  101.   poll_fd.events = POLLIN | POLLOUT | POLLERR | POLLHUP;
  102.   poll_fd.revents = 0;
  103.  
  104.   char input_buffer[INPUT_BUFFER_SIZE];
  105.   char output_buffer[OUTPUT_BUFFER_SIZE];
  106.   int input_buffer_len = 0;
  107.   int output_buffer_len = 0;
  108.  
  109.   // кладем в буфер на передачу несколько пакетов
  110.   while (output_buffer_len+sizeof(int)+PACKET_SIZE <= sizeof(output_buffer))
  111.   {
  112.     *((int *)(output_buffer+output_buffer_len)) = PACKET_SIZE;
  113.     output_buffer_len += sizeof(int)+PACKET_SIZE;
  114.   }
  115.  
  116.   ThreadStats *stats = new ThreadStats;
  117.   stats->bytes_received = 0;
  118.   stats->packets_received = 0;
  119.  
  120.   while(!flag_kill)
  121.   {
  122.     // если есть что передавать - мониторим разрешение на передачу
  123.     if (output_buffer_len > 0)
  124.       poll_fd.events |= POLLOUT;
  125.     else
  126.       poll_fd.events &= ~POLLOUT;
  127.  
  128.     int res = poll(&poll_fd, 1, 500);
  129.     if (res < 0)
  130.       break;
  131.     if (res == 0)
  132.       continue;
  133.  
  134.     // ошибочка
  135.     if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP))
  136.       break;
  137.  
  138.     // есть данные для чтения из сокета
  139.     if (poll_fd.revents & POLLIN)
  140.     {
  141.       int rcv_len = recv(fd, input_buffer+input_buffer_len, sizeof(input_buffer)-input_buffer_len, 0);
  142.       if (rcv_len <= 0)
  143.         break;
  144.       stats->bytes_received += rcv_len;
  145.       input_buffer_len += rcv_len;
  146.  
  147.       // парсинг буфера приема
  148.       while(input_buffer_len > sizeof(int))
  149.       {
  150.         int packet_size = *((int *)input_buffer);
  151.         if (input_buffer_len < sizeof(int)+packet_size)
  152.           break;
  153.         if (output_buffer_len+sizeof(int)+packet_size > sizeof(output_buffer))
  154.           break;
  155.         stats->packets_received++;
  156.         // если получили корректный пакет и можем его впихнуть в буфер на передачу - отправляем его туда
  157.         memcpy(output_buffer+output_buffer_len, input_buffer, sizeof(int)+packet_size);
  158.         output_buffer_len += sizeof(int)+packet_size;
  159.         // а из буфера приема удаляем
  160.         memmove(input_buffer, input_buffer+sizeof(int)+packet_size, input_buffer_len-sizeof(int)-packet_size);
  161.         input_buffer_len -= sizeof(int)+packet_size;
  162.       }
  163.  
  164.       poll_fd.revents &= ~POLLIN;
  165.     }
  166.  
  167.     // можно записать данные в сокет
  168.     if (poll_fd.revents & POLLOUT)
  169.     {
  170.       int snd_len = send(fd, output_buffer, output_buffer_len, 0);
  171.       if (snd_len <= 0)
  172.         break;
  173.       // удаляем отправленные данные из буфера
  174.       memmove(output_buffer, output_buffer+snd_len, output_buffer_len-snd_len);
  175.       output_buffer_len -= snd_len;
  176.       poll_fd.revents &= ~POLLOUT;
  177.     }
  178.   }
  179.   close(fd);
  180.  
  181.   return stats;
  182. }
  183.  
  184. //---------------------------------------------------------------------------
  185. static void *worker_thread(void *arg)
  186. {
  187.   WorkerThreadData *data = (WorkerThreadData *)arg;
  188.   while (!flag_kill)
  189.   {
  190.     if (data->client_index >= 0)
  191.     {
  192.       pthread_mutex_lock(&data->mutex);
  193.  
  194.       // имитируем обработку запроса (2ms)
  195.       usleep(WORKER_WORK_TIME);
  196.  
  197.       // пытаемся записать пакет в буфер на передачу клиенту
  198.       while (!flag_kill)
  199.       {
  200.         pthread_mutex_lock(&mutex_clients);
  201.         if (clients[data->client_index].fd < 0)
  202.         {
  203.           pthread_mutex_unlock(&mutex_clients);
  204.           break;
  205.         }
  206.         if (clients[data->client_index].output_buffer_len+data->packet_buffer_len < sizeof(clients[data->client_index].output_buffer))
  207.         {
  208.           memcpy(clients[data->client_index].output_buffer+clients[data->client_index].output_buffer_len,
  209.                  data->packet_buffer, data->packet_buffer_len);
  210.           clients[data->client_index].output_buffer_len += data->packet_buffer_len;
  211.           pthread_mutex_unlock(&mutex_clients);
  212.           break;
  213.         }
  214.         else
  215.         {
  216.           pthread_mutex_unlock(&mutex_clients);
  217.           usleep(WORKER_SLEEP_TIME);
  218.         }
  219.       }
  220.  
  221.       data->client_index = -1;
  222.       pthread_mutex_unlock(&data->mutex);
  223.     }
  224.     else
  225.       usleep(WORKER_SLEEP_TIME);
  226.   }
  227.   return NULL;
  228. }
  229.  
  230. //---------------------------------------------------------------------------
  231. void termination_handler(int)
  232. {
  233.   flag_kill = true;
  234. }
  235. //---------------------------------------------------------------------------
  236. bool client_parse_input_buffer(int client_index)
  237. {
  238.   while(clients[client_index].input_buffer_len > sizeof(int))
  239.   {
  240.     int packet_size = *((int *)clients[client_index].input_buffer);
  241.     if (clients[client_index].input_buffer_len < sizeof(int)+packet_size)
  242.       break;
  243.     if (clients[client_index].output_buffer_len+sizeof(int)+packet_size > sizeof(clients[client_index].output_buffer))
  244.       break;
  245.     // если получили корректный пакет
  246.     // ищем свободного worker-а
  247.     int worker_index = -1;
  248.     for (int i=0; i < WORKER_THREAD_COUNT; i++)
  249.     {
  250.       if (worker_threads[i].client_index < 0)
  251.       {
  252.         worker_index = i;
  253.         break;
  254.       }
  255.     }
  256.     // если нет свободного worker-а - выходим
  257.     if (worker_index < 0)
  258.       return false;
  259.     // записываем пакет на обработку в буфер worker-а
  260.     pthread_mutex_lock(&worker_threads[worker_index].mutex);
  261.     memcpy(worker_threads[worker_index].packet_buffer, clients[client_index].input_buffer, sizeof(int)+packet_size);
  262.     worker_threads[worker_index].packet_buffer_len = sizeof(int)+packet_size;
  263.     worker_threads[worker_index].client_index = client_index;
  264.     pthread_mutex_unlock(&worker_threads[worker_index].mutex);
  265.     // а из буфера приема удаляем
  266.     memmove(clients[client_index].input_buffer, clients[client_index].input_buffer+sizeof(int)+packet_size, clients[client_index].input_buffer_len-sizeof(int)-packet_size);
  267.     clients[client_index].input_buffer_len -= sizeof(int)+packet_size;
  268.   }
  269.   return true;
  270. }
  271. //---------------------------------------------------------------------------
  272. void client_disconnected(int client_index, struct pollfd *poll_fds, int &n_fds)
  273. {
  274.   close(clients[client_index].fd);
  275.   clients[client_index].fd = -1;
  276.   clients[client_index].input_buffer_len = 0;
  277.   clients[client_index].output_buffer_len = 0;
  278.   if (clients[client_index].poll_fd_index >= 0 && clients[client_index].poll_fd_index < n_fds-1)
  279.   {
  280.     memmove(poll_fds+clients[client_index].poll_fd_index,
  281.             poll_fds+clients[client_index].poll_fd_index+1,
  282.             (n_fds-clients[client_index].poll_fd_index-1)*sizeof(struct pollfd));
  283.   }
  284.   n_fds--;
  285. }
  286. //---------------------------------------------------------------------------
  287. int main(int argc, char *argv[])
  288. {
  289.   signal(SIGTERM, termination_handler);
  290.   signal(SIGSTOP, termination_handler);
  291.   signal(SIGINT,  termination_handler);
  292.   signal(SIGQUIT, termination_handler);
  293.  
  294.   int sock_srv = socket(AF_INET, SOCK_STREAM, 0);
  295.   if (sock_srv < 0)
  296.     return 1;
  297.  
  298.   // устанавливаем опцию для повторного использования адреса (порта) без ожидания таймаута
  299.   int reuse_addr = 1;
  300.   if (setsockopt(sock_srv, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr)) < 0)
  301.     return 2;
  302.  
  303.   // устанавливаем неблокирующий режим
  304.   if (!setnonblocking(sock_srv))
  305.     return 3;
  306.  
  307.   struct sockaddr_in addr;
  308.   memset((char *) &addr, 0, sizeof(addr));
  309.   addr.sin_family = AF_INET;
  310.   addr.sin_port = htons(SOCKET_TCP_PORT);
  311.   addr.sin_addr.s_addr = htonl(INADDR_ANY);
  312.  
  313.   // привязываем сокет к адресу (порту)
  314.   if (bind(sock_srv, (struct sockaddr *)&addr, sizeof(addr)) < 0)
  315.     return 4;
  316.  
  317.   // начинаем слушать порт
  318.   if (listen(sock_srv, 100) < 0)
  319.     return 5;
  320.  
  321.   // запускаем потоки worker-ов
  322.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  323.   {
  324.     worker_threads[i].client_index = -1;
  325.     worker_threads[i].packet_buffer_len = 0;
  326.     pthread_mutex_init(&worker_threads[i].mutex, NULL);
  327.     pthread_create(&worker_threads[i].thread_id, NULL, &worker_thread, worker_threads+i);
  328.   }
  329.  
  330.   // запускаем потоки клиентов
  331.   pthread_t thread_ids[MAX_CLIENTS];
  332.   for (int i=0; i < MAX_CLIENTS; i++)
  333.   {
  334.     pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
  335.   }
  336.  
  337.   // чистим массив с данными клиентов
  338.   pthread_mutex_lock(&mutex_clients);
  339.   for (int i=0; i < MAX_CLIENTS; i++)
  340.   {
  341.     clients[i].fd = -1;
  342.     clients[i].input_buffer_len = 0;
  343.     clients[i].output_buffer_len = 0;
  344.     clients[i].poll_fd_index = -1;
  345.   }
  346.   pthread_mutex_unlock(&mutex_clients);
  347.  
  348.   // начинаем мониторить события сокета(ов)
  349.   // по мере подключения клиентов n_fds будет возрастать, poll_fds заполняться
  350.   // а отключение первого же клиента будет означать останов программы,
  351.   // поэтому мы не будем заботиться о сдвиге poll_fds при отключении клиента
  352.   struct pollfd poll_fds[MAX_CLIENTS+1];
  353.   memset(&poll_fds, 0, sizeof(pollfd)*(MAX_CLIENTS+1));
  354.   poll_fds[0].fd = sock_srv;
  355.   poll_fds[0].events = POLLIN;
  356.   poll_fds[0].revents = 0;
  357.   int n_fds = 1;
  358.  
  359.   struct timespec ts_start;
  360.   struct timespec ts_current;
  361.   clock_gettime(CLOCK_MONOTONIC, &ts_start);
  362.   double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  363.   double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  364.   bool worker_available = true;
  365.   int last_client_index = 0;
  366.   while(!flag_kill)
  367.   {
  368.     clock_gettime(CLOCK_MONOTONIC, &ts_current);
  369.     cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
  370.     if (cur_time-start_time > TEST_DURATION)
  371.     {
  372.       flag_kill = true;
  373.       break;
  374.     }
  375.     // если есть что передавать - мониторим разрешение на передачу
  376.     // если можем читать - мониторим появление данных для чтения
  377.     pthread_mutex_lock(&mutex_clients);
  378.     for (int i=0; i < MAX_CLIENTS; i++)
  379.     {
  380.       if (clients[i].fd >= 0)
  381.       {
  382.         if (clients[i].output_buffer_len > 0)
  383.           poll_fds[clients[i].poll_fd_index].events |= POLLOUT;
  384.         else
  385.           poll_fds[clients[i].poll_fd_index].events &= ~POLLOUT;
  386.         if (clients[i].input_buffer_len < sizeof(clients[i].input_buffer_len))
  387.           poll_fds[clients[i].poll_fd_index].events |= POLLIN;
  388.         else
  389.           poll_fds[clients[i].poll_fd_index].events &= ~POLLIN;
  390.       }
  391.     }
  392.     pthread_mutex_unlock(&mutex_clients);
  393.  
  394.     int res = poll(poll_fds, n_fds, worker_available ? 500 : 1);
  395.     if (res < 0)
  396.       break;
  397.     if (res == 0)
  398.     {
  399.       // если все worker-ы были заняты в предыдущий цикл - проверяем, не освободились ли
  400.       if (!worker_available)
  401.       {
  402.         last_client_index++;
  403.         if (last_client_index >= MAX_CLIENTS)
  404.           last_client_index = 0;
  405.         pthread_mutex_lock(&mutex_clients);
  406.         for (int i=last_client_index; i < MAX_CLIENTS; i++)
  407.         {
  408.           if (clients[i].fd < 0)
  409.             continue;
  410.           worker_available = client_parse_input_buffer(i);
  411.           if (!worker_available)
  412.             break;
  413.         }
  414.         pthread_mutex_unlock(&mutex_clients);
  415.       }
  416.       continue;
  417.     }
  418.  
  419.     // есть новое подключение клиента
  420.     if (poll_fds[0].revents & POLLIN)
  421.     {
  422.       poll_fds[0].revents &= ~POLLIN;
  423.       socklen_t len = sizeof(struct sockaddr_in);
  424.       struct sockaddr_in addr;
  425.       memset(&addr, 0, len);
  426.       int sock = accept(sock_srv, (struct sockaddr *)&addr, &len);
  427.       if (sock < 0)
  428.         break;
  429.       if (!setnonblocking(sock))
  430.         break;
  431.       // если слишком много клиентов - закрываем новое соединение
  432.       if (n_fds > MAX_CLIENTS)
  433.       {
  434.         close(sock);
  435.       }
  436.       else
  437.       {
  438.         int client_index = -1;
  439.         pthread_mutex_lock(&mutex_clients);
  440.         for (int i=0; i < MAX_CLIENTS; i++)
  441.         {
  442.           if (clients[i].fd < 0)
  443.           {
  444.             client_index = i;
  445.             break;
  446.           }
  447.         }
  448.         if (client_index >= 0)
  449.         {
  450.           clients[client_index].fd = sock;
  451.           clients[client_index].input_buffer_len = 0;
  452.           clients[client_index].output_buffer_len = 0;
  453.           clients[client_index].poll_fd_index = n_fds;
  454.           poll_fds[n_fds].fd = sock;
  455.           poll_fds[n_fds].events = POLLIN | POLLERR | POLLHUP;
  456.           poll_fds[n_fds].revents = 0;
  457.           n_fds++;
  458.         }
  459.         else
  460.           close(sock);
  461.         pthread_mutex_unlock(&mutex_clients);
  462.       }
  463.     }
  464.  
  465.     pthread_mutex_lock(&mutex_clients);
  466.     for (int i=0; i < MAX_CLIENTS; i++)
  467.     {
  468.       if (clients[i].fd >= 0)
  469.       {
  470.         if ((poll_fds[clients[i].poll_fd_index].revents & POLLERR) || (poll_fds[clients[i].poll_fd_index].revents & POLLHUP))
  471.         {
  472.           client_disconnected(i, poll_fds, n_fds);
  473.           continue;
  474.         }
  475.  
  476.         // если есть что читать от клиента
  477.         if (poll_fds[clients[i].poll_fd_index].revents & POLLIN)
  478.         {
  479.           poll_fds[clients[i].poll_fd_index].revents &= ~POLLIN;
  480.           int rcv_len = recv(clients[i].fd, clients[i].input_buffer+clients[i].input_buffer_len, sizeof(clients[i].input_buffer)-clients[i].input_buffer_len, 0);
  481.           if (rcv_len <= 0)
  482.           {
  483.             client_disconnected(i, poll_fds, n_fds);
  484.             continue;
  485.           }
  486.           clients[i].input_buffer_len += rcv_len;
  487.  
  488.           // парсинг буфера приема
  489.           worker_available = client_parse_input_buffer(i);
  490.           if (!worker_available)
  491.             last_client_index = i;
  492.         }
  493.  
  494.         // если можно записать данные в сокет
  495.         if (poll_fds[clients[i].poll_fd_index].revents & POLLOUT)
  496.         {
  497.           poll_fds[clients[i].poll_fd_index].revents &= ~POLLOUT;
  498.           int snd_len = send(clients[i].fd, clients[i].output_buffer, clients[i].output_buffer_len, 0);
  499.           if (snd_len <= 0)
  500.           {
  501.             client_disconnected(i, poll_fds, n_fds);
  502.             continue;
  503.           }
  504.           // удаляем отправленные данные из буфера
  505.           memmove(clients[i].output_buffer, clients[i].output_buffer+snd_len, clients[i].output_buffer_len-snd_len);
  506.           clients[i].output_buffer_len -= snd_len;
  507.         }
  508.       }
  509.     }
  510.     pthread_mutex_unlock(&mutex_clients);
  511.   }
  512.   double elapsed = cur_time-start_time;
  513.  
  514.   // отключаем клиентов
  515.   pthread_mutex_lock(&mutex_clients);
  516.   for (int i=0; i < MAX_CLIENTS; i++)
  517.   {
  518.     if (clients[i].fd >= 0)
  519.       close(clients[i].fd);
  520.   }
  521.   pthread_mutex_unlock(&mutex_clients);
  522.  
  523.   // ждем завершения потоков
  524.   usleep(600000);
  525.  
  526.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  527.   {
  528.     void *res = NULL;
  529.     pthread_join(worker_threads[i].thread_id, &res);
  530.     pthread_mutex_destroy(&worker_threads[i].mutex);
  531.   }
  532.  
  533.   unsigned long long total_packets_received = 0;
  534.   unsigned long long total_bytes_received = 0;
  535.   // запускаем потоки клиентов
  536.   printf("thread id:\tbytes rcv\tpackets rcv\r\n");
  537.   for (int i=0; i < MAX_CLIENTS; i++)
  538.   {
  539.     void *res = NULL;
  540.     pthread_join(thread_ids[i], &res);
  541.     ThreadStats *stat = (ThreadStats *)res;
  542.     if (stat)
  543.     {
  544.       printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
  545.       total_bytes_received += stat->bytes_received;
  546.       total_packets_received += stat->packets_received;
  547.       delete stat;
  548.     }
  549.   }
  550.   printf("    TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
  551.   printf("\r\n");
  552.   printf("Elapsed time: %.3lf\r\n", elapsed);
  553.   printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
  554.  
  555.   pthread_mutex_destroy(&mutex_clients);
  556.   close(sock_srv);
  557.   return 0;
  558. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement