nitro2005

socket_srv_multi.cpp

Oct 31st, 2014
6,571
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <unistd.h>
  2. #include <sys/types.h>
  3. #include <sys/wait.h>
  4. #include <signal.h>
  5. #include <stdio.h>
  6. #include <math.h>
  7. #include <string.h>
  8. #include <stdlib.h>
  9. #include <time.h>
  10. #include <malloc.h>
  11. #include <fcntl.h>
  12. #include <syslog.h>
  13. #include <sys/socket.h>
  14. #include <netinet/in.h>
  15. #include <arpa/inet.h>
  16. #include <netdb.h>
  17. #include <pthread.h>
  18. #include <sys/poll.h>
  19.  
  20. #define SOCKET_TCP_PORT               1101
  21. #define PACKET_SIZE                   1024
  22. #define INPUT_BUFFER_SIZE             (PACKET_SIZE+sizeof(int))*5
  23. #define OUTPUT_BUFFER_SIZE            (PACKET_SIZE+sizeof(int))*5
  24. #define MAX_CLIENTS                   10
  25. #define TEST_DURATION                 10
  26. #define WORKER_THREAD_COUNT           2
  27. #define WORKER_WORK_TIME              2000
  28. #define WORKER_SLEEP_TIME             1
  29.  
  30. struct SocketClient
  31. {
  32.   char input_buffer[INPUT_BUFFER_SIZE];
  33.   char output_buffer[OUTPUT_BUFFER_SIZE];
  34.   int input_buffer_len;
  35.   int output_buffer_len;
  36.   int fd;
  37.   int poll_fd_index;
  38. };
  39. struct ThreadStats
  40. {
  41.   unsigned long long packets_received;
  42.   unsigned long long bytes_received;
  43. };
  44.  
  45. struct WorkerThreadData
  46. {
  47.   pthread_t thread_id;
  48.   pthread_mutex_t mutex;
  49.   char packet_buffer[INPUT_BUFFER_SIZE];
  50.   int packet_buffer_len;
  51.   int client_index;
  52. };
  53.  
  54. struct SocketClient clients[MAX_CLIENTS];
  55. pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
  56. WorkerThreadData worker_threads[WORKER_THREAD_COUNT];
  57. bool flag_kill;
  58.  
  59. //---------------------------------------------------------------------------
  60. bool setnonblocking(int fd)
  61. {
  62.   int opts = fcntl(fd, F_GETFL);
  63.   if (opts < 0)
  64.     return false;
  65.   opts = (opts | O_NONBLOCK);
  66.   if (fcntl(fd, F_SETFL, opts) < 0)
  67.     return false;
  68.   return true;
  69. }
  70.  
  71. //---------------------------------------------------------------------------
  72. static void *client_thread(void *)
  73. {
  74.   // устанавливаем соединение с сервером
  75.   struct hostent *host_info = gethostbyname("127.0.0.1");
  76.   if (!host_info)
  77.     return NULL;
  78.   int fd = socket(AF_INET, SOCK_STREAM, 0);
  79.   if (fd < 0)
  80.     return NULL;
  81.   struct sockaddr_in addr;
  82.   memset((char *) &addr, 0, sizeof(addr));
  83.   addr.sin_family = host_info->h_addrtype;
  84.   addr.sin_port = htons(SOCKET_TCP_PORT);
  85.   addr.sin_addr = *(struct in_addr*)host_info->h_addr;
  86.   if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
  87.   {
  88.     close(fd);
  89.     return NULL;
  90.   }
  91.   if (!setnonblocking(fd))
  92.   {
  93.     close(fd);
  94.     return NULL;
  95.   }
  96.  
  97.   // начинаем мониторить события сокета
  98.   struct pollfd poll_fd;
  99.   memset(&poll_fd, 0, sizeof(poll_fd));
  100.   poll_fd.fd = fd;
  101.   poll_fd.events = POLLIN | POLLOUT | POLLERR | POLLHUP;
  102.   poll_fd.revents = 0;
  103.  
  104.   char input_buffer[INPUT_BUFFER_SIZE];
  105.   char output_buffer[OUTPUT_BUFFER_SIZE];
  106.   int input_buffer_len = 0;
  107.   int output_buffer_len = 0;
  108.  
  109.   // кладем в буфер на передачу несколько пакетов
  110.   while (output_buffer_len+sizeof(int)+PACKET_SIZE <= sizeof(output_buffer))
  111.   {
  112.     *((int *)(output_buffer+output_buffer_len)) = PACKET_SIZE;
  113.     output_buffer_len += sizeof(int)+PACKET_SIZE;
  114.   }
  115.  
  116.   ThreadStats *stats = new ThreadStats;
  117.   stats->bytes_received = 0;
  118.   stats->packets_received = 0;
  119.  
  120.   while(!flag_kill)
  121.   {
  122.     // если есть что передавать - мониторим разрешение на передачу
  123.     if (output_buffer_len > 0)
  124.       poll_fd.events |= POLLOUT;
  125.     else
  126.       poll_fd.events &= ~POLLOUT;
  127.  
  128.     int res = poll(&poll_fd, 1, 500);
  129.     if (res < 0)
  130.       break;
  131.     if (res == 0)
  132.       continue;
  133.  
  134.     // ошибочка
  135.     if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP))
  136.       break;
  137.  
  138.     // есть данные для чтения из сокета
  139.     if (poll_fd.revents & POLLIN)
  140.     {
  141.       int rcv_len = recv(fd, input_buffer+input_buffer_len, sizeof(input_buffer)-input_buffer_len, 0);
  142.       if (rcv_len <= 0)
  143.         break;
  144.       stats->bytes_received += rcv_len;
  145.       input_buffer_len += rcv_len;
  146.  
  147.       // парсинг буфера приема
  148.       while(input_buffer_len > sizeof(int))
  149.       {
  150.         int packet_size = *((int *)input_buffer);
  151.         if (input_buffer_len < sizeof(int)+packet_size)
  152.           break;
  153.         if (output_buffer_len+sizeof(int)+packet_size > sizeof(output_buffer))
  154.           break;
  155.         stats->packets_received++;
  156.         // если получили корректный пакет и можем его впихнуть в буфер на передачу - отправляем его туда
  157.         memcpy(output_buffer+output_buffer_len, input_buffer, sizeof(int)+packet_size);
  158.         output_buffer_len += sizeof(int)+packet_size;
  159.         // а из буфера приема удаляем
  160.         memmove(input_buffer, input_buffer+sizeof(int)+packet_size, input_buffer_len-sizeof(int)-packet_size);
  161.         input_buffer_len -= sizeof(int)+packet_size;
  162.       }
  163.  
  164.       poll_fd.revents &= ~POLLIN;
  165.     }
  166.  
  167.     // можно записать данные в сокет
  168.     if (poll_fd.revents & POLLOUT)
  169.     {
  170.       int snd_len = send(fd, output_buffer, output_buffer_len, 0);
  171.       if (snd_len <= 0)
  172.         break;
  173.       // удаляем отправленные данные из буфера
  174.       memmove(output_buffer, output_buffer+snd_len, output_buffer_len-snd_len);
  175.       output_buffer_len -= snd_len;
  176.       poll_fd.revents &= ~POLLOUT;
  177.     }
  178.   }
  179.   close(fd);
  180.  
  181.   return stats;
  182. }
  183.  
  184. //---------------------------------------------------------------------------
  185. static void *worker_thread(void *arg)
  186. {
  187.   WorkerThreadData *data = (WorkerThreadData *)arg;
  188.   while (!flag_kill)
  189.   {
  190.     if (data->client_index >= 0)
  191.     {
  192.       pthread_mutex_lock(&data->mutex);
  193.  
  194.       // имитируем обработку запроса (2ms)
  195.       usleep(WORKER_WORK_TIME);
  196.  
  197.       // пытаемся записать пакет в буфер на передачу клиенту
  198.       while (!flag_kill)
  199.       {
  200.         pthread_mutex_lock(&mutex_clients);
  201.         if (clients[data->client_index].fd < 0)
  202.         {
  203.           pthread_mutex_unlock(&mutex_clients);
  204.           break;
  205.         }
  206.         if (clients[data->client_index].output_buffer_len+data->packet_buffer_len < sizeof(clients[data->client_index].output_buffer))
  207.         {
  208.           memcpy(clients[data->client_index].output_buffer+clients[data->client_index].output_buffer_len,
  209.                  data->packet_buffer, data->packet_buffer_len);
  210.           clients[data->client_index].output_buffer_len += data->packet_buffer_len;
  211.           pthread_mutex_unlock(&mutex_clients);
  212.           break;
  213.         }
  214.         else
  215.         {
  216.           pthread_mutex_unlock(&mutex_clients);
  217.           usleep(WORKER_SLEEP_TIME);
  218.         }
  219.       }
  220.  
  221.       data->client_index = -1;
  222.       pthread_mutex_unlock(&data->mutex);
  223.     }
  224.     else
  225.       usleep(WORKER_SLEEP_TIME);
  226.   }
  227.   return NULL;
  228. }
  229.  
  230. //---------------------------------------------------------------------------
  231. void termination_handler(int)
  232. {
  233.   flag_kill = true;
  234. }
  235. //---------------------------------------------------------------------------
  236. bool client_parse_input_buffer(int client_index)
  237. {
  238.   while(clients[client_index].input_buffer_len > sizeof(int))
  239.   {
  240.     int packet_size = *((int *)clients[client_index].input_buffer);
  241.     if (clients[client_index].input_buffer_len < sizeof(int)+packet_size)
  242.       break;
  243.     if (clients[client_index].output_buffer_len+sizeof(int)+packet_size > sizeof(clients[client_index].output_buffer))
  244.       break;
  245.     // если получили корректный пакет
  246.     // ищем свободного worker-а
  247.     int worker_index = -1;
  248.     for (int i=0; i < WORKER_THREAD_COUNT; i++)
  249.     {
  250.       if (worker_threads[i].client_index < 0)
  251.       {
  252.         worker_index = i;
  253.         break;
  254.       }
  255.     }
  256.     // если нет свободного worker-а - выходим
  257.     if (worker_index < 0)
  258.       return false;
  259.     // записываем пакет на обработку в буфер worker-а
  260.     pthread_mutex_lock(&worker_threads[worker_index].mutex);
  261.     memcpy(worker_threads[worker_index].packet_buffer, clients[client_index].input_buffer, sizeof(int)+packet_size);
  262.     worker_threads[worker_index].packet_buffer_len = sizeof(int)+packet_size;
  263.     worker_threads[worker_index].client_index = client_index;
  264.     pthread_mutex_unlock(&worker_threads[worker_index].mutex);
  265.     // а из буфера приема удаляем
  266.     memmove(clients[client_index].input_buffer, clients[client_index].input_buffer+sizeof(int)+packet_size, clients[client_index].input_buffer_len-sizeof(int)-packet_size);
  267.     clients[client_index].input_buffer_len -= sizeof(int)+packet_size;
  268.   }
  269.   return true;
  270. }
  271. //---------------------------------------------------------------------------
  272. void client_disconnected(int client_index, struct pollfd *poll_fds, int &n_fds)
  273. {
  274.   close(clients[client_index].fd);
  275.   clients[client_index].fd = -1;
  276.   clients[client_index].input_buffer_len = 0;
  277.   clients[client_index].output_buffer_len = 0;
  278.   if (clients[client_index].poll_fd_index >= 0 && clients[client_index].poll_fd_index < n_fds-1)
  279.   {
  280.     memmove(poll_fds+clients[client_index].poll_fd_index,
  281.             poll_fds+clients[client_index].poll_fd_index+1,
  282.             (n_fds-clients[client_index].poll_fd_index-1)*sizeof(struct pollfd));
  283.   }
  284.   n_fds--;
  285. }
  286. //---------------------------------------------------------------------------
  287. int main(int argc, char *argv[])
  288. {
  289.   signal(SIGTERM, termination_handler);
  290.   signal(SIGSTOP, termination_handler);
  291.   signal(SIGINT,  termination_handler);
  292.   signal(SIGQUIT, termination_handler);
  293.  
  294.   int sock_srv = socket(AF_INET, SOCK_STREAM, 0);
  295.   if (sock_srv < 0)
  296.     return 1;
  297.  
  298.   // устанавливаем опцию для повторного использования адреса (порта) без ожидания таймаута
  299.   int reuse_addr = 1;
  300.   if (setsockopt(sock_srv, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr)) < 0)
  301.     return 2;
  302.  
  303.   // устанавливаем неблокирующий режим
  304.   if (!setnonblocking(sock_srv))
  305.     return 3;
  306.  
  307.   struct sockaddr_in addr;
  308.   memset((char *) &addr, 0, sizeof(addr));
  309.   addr.sin_family = AF_INET;
  310.   addr.sin_port = htons(SOCKET_TCP_PORT);
  311.   addr.sin_addr.s_addr = htonl(INADDR_ANY);
  312.  
  313.   // привязываем сокет к адресу (порту)
  314.   if (bind(sock_srv, (struct sockaddr *)&addr, sizeof(addr)) < 0)
  315.     return 4;
  316.  
  317.   // начинаем слушать порт
  318.   if (listen(sock_srv, 100) < 0)
  319.     return 5;
  320.  
  321.   // запускаем потоки worker-ов
  322.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  323.   {
  324.     worker_threads[i].client_index = -1;
  325.     worker_threads[i].packet_buffer_len = 0;
  326.     pthread_mutex_init(&worker_threads[i].mutex, NULL);
  327.     pthread_create(&worker_threads[i].thread_id, NULL, &worker_thread, worker_threads+i);
  328.   }
  329.  
  330.   // запускаем потоки клиентов
  331.   pthread_t thread_ids[MAX_CLIENTS];
  332.   for (int i=0; i < MAX_CLIENTS; i++)
  333.   {
  334.     pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
  335.   }
  336.  
  337.   // чистим массив с данными клиентов
  338.   pthread_mutex_lock(&mutex_clients);
  339.   for (int i=0; i < MAX_CLIENTS; i++)
  340.   {
  341.     clients[i].fd = -1;
  342.     clients[i].input_buffer_len = 0;
  343.     clients[i].output_buffer_len = 0;
  344.     clients[i].poll_fd_index = -1;
  345.   }
  346.   pthread_mutex_unlock(&mutex_clients);
  347.  
  348.   // начинаем мониторить события сокета(ов)
  349.   // по мере подключения клиентов n_fds будет возрастать, poll_fds заполняться
  350.   // а отключение первого же клиента будет означать останов программы,
  351.   // поэтому мы не будем заботиться о сдвиге poll_fds при отключении клиента
  352.   struct pollfd poll_fds[MAX_CLIENTS+1];
  353.   memset(&poll_fds, 0, sizeof(pollfd)*(MAX_CLIENTS+1));
  354.   poll_fds[0].fd = sock_srv;
  355.   poll_fds[0].events = POLLIN;
  356.   poll_fds[0].revents = 0;
  357.   int n_fds = 1;
  358.  
  359.   struct timespec ts_start;
  360.   struct timespec ts_current;
  361.   clock_gettime(CLOCK_MONOTONIC, &ts_start);
  362.   double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  363.   double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
  364.   bool worker_available = true;
  365.   int last_client_index = 0;
  366.   while(!flag_kill)
  367.   {
  368.     clock_gettime(CLOCK_MONOTONIC, &ts_current);
  369.     cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
  370.     if (cur_time-start_time > TEST_DURATION)
  371.     {
  372.       flag_kill = true;
  373.       break;
  374.     }
  375.     // если есть что передавать - мониторим разрешение на передачу
  376.     // если можем читать - мониторим появление данных для чтения
  377.     pthread_mutex_lock(&mutex_clients);
  378.     for (int i=0; i < MAX_CLIENTS; i++)
  379.     {
  380.       if (clients[i].fd >= 0)
  381.       {
  382.         if (clients[i].output_buffer_len > 0)
  383.           poll_fds[clients[i].poll_fd_index].events |= POLLOUT;
  384.         else
  385.           poll_fds[clients[i].poll_fd_index].events &= ~POLLOUT;
  386.         if (clients[i].input_buffer_len < sizeof(clients[i].input_buffer_len))
  387.           poll_fds[clients[i].poll_fd_index].events |= POLLIN;
  388.         else
  389.           poll_fds[clients[i].poll_fd_index].events &= ~POLLIN;
  390.       }
  391.     }
  392.     pthread_mutex_unlock(&mutex_clients);
  393.  
  394.     int res = poll(poll_fds, n_fds, worker_available ? 500 : 1);
  395.     if (res < 0)
  396.       break;
  397.     if (res == 0)
  398.     {
  399.       // если все worker-ы были заняты в предыдущий цикл - проверяем, не освободились ли
  400.       if (!worker_available)
  401.       {
  402.         last_client_index++;
  403.         if (last_client_index >= MAX_CLIENTS)
  404.           last_client_index = 0;
  405.         pthread_mutex_lock(&mutex_clients);
  406.         for (int i=last_client_index; i < MAX_CLIENTS; i++)
  407.         {
  408.           if (clients[i].fd < 0)
  409.             continue;
  410.           worker_available = client_parse_input_buffer(i);
  411.           if (!worker_available)
  412.             break;
  413.         }
  414.         pthread_mutex_unlock(&mutex_clients);
  415.       }
  416.       continue;
  417.     }
  418.  
  419.     // есть новое подключение клиента
  420.     if (poll_fds[0].revents & POLLIN)
  421.     {
  422.       poll_fds[0].revents &= ~POLLIN;
  423.       socklen_t len = sizeof(struct sockaddr_in);
  424.       struct sockaddr_in addr;
  425.       memset(&addr, 0, len);
  426.       int sock = accept(sock_srv, (struct sockaddr *)&addr, &len);
  427.       if (sock < 0)
  428.         break;
  429.       if (!setnonblocking(sock))
  430.         break;
  431.       // если слишком много клиентов - закрываем новое соединение
  432.       if (n_fds > MAX_CLIENTS)
  433.       {
  434.         close(sock);
  435.       }
  436.       else
  437.       {
  438.         int client_index = -1;
  439.         pthread_mutex_lock(&mutex_clients);
  440.         for (int i=0; i < MAX_CLIENTS; i++)
  441.         {
  442.           if (clients[i].fd < 0)
  443.           {
  444.             client_index = i;
  445.             break;
  446.           }
  447.         }
  448.         if (client_index >= 0)
  449.         {
  450.           clients[client_index].fd = sock;
  451.           clients[client_index].input_buffer_len = 0;
  452.           clients[client_index].output_buffer_len = 0;
  453.           clients[client_index].poll_fd_index = n_fds;
  454.           poll_fds[n_fds].fd = sock;
  455.           poll_fds[n_fds].events = POLLIN | POLLERR | POLLHUP;
  456.           poll_fds[n_fds].revents = 0;
  457.           n_fds++;
  458.         }
  459.         else
  460.           close(sock);
  461.         pthread_mutex_unlock(&mutex_clients);
  462.       }
  463.     }
  464.  
  465.     pthread_mutex_lock(&mutex_clients);
  466.     for (int i=0; i < MAX_CLIENTS; i++)
  467.     {
  468.       if (clients[i].fd >= 0)
  469.       {
  470.         if ((poll_fds[clients[i].poll_fd_index].revents & POLLERR) || (poll_fds[clients[i].poll_fd_index].revents & POLLHUP))
  471.         {
  472.           client_disconnected(i, poll_fds, n_fds);
  473.           continue;
  474.         }
  475.  
  476.         // если есть что читать от клиента
  477.         if (poll_fds[clients[i].poll_fd_index].revents & POLLIN)
  478.         {
  479.           poll_fds[clients[i].poll_fd_index].revents &= ~POLLIN;
  480.           int rcv_len = recv(clients[i].fd, clients[i].input_buffer+clients[i].input_buffer_len, sizeof(clients[i].input_buffer)-clients[i].input_buffer_len, 0);
  481.           if (rcv_len <= 0)
  482.           {
  483.             client_disconnected(i, poll_fds, n_fds);
  484.             continue;
  485.           }
  486.           clients[i].input_buffer_len += rcv_len;
  487.  
  488.           // парсинг буфера приема
  489.           worker_available = client_parse_input_buffer(i);
  490.           if (!worker_available)
  491.             last_client_index = i;
  492.         }
  493.  
  494.         // если можно записать данные в сокет
  495.         if (poll_fds[clients[i].poll_fd_index].revents & POLLOUT)
  496.         {
  497.           poll_fds[clients[i].poll_fd_index].revents &= ~POLLOUT;
  498.           int snd_len = send(clients[i].fd, clients[i].output_buffer, clients[i].output_buffer_len, 0);
  499.           if (snd_len <= 0)
  500.           {
  501.             client_disconnected(i, poll_fds, n_fds);
  502.             continue;
  503.           }
  504.           // удаляем отправленные данные из буфера
  505.           memmove(clients[i].output_buffer, clients[i].output_buffer+snd_len, clients[i].output_buffer_len-snd_len);
  506.           clients[i].output_buffer_len -= snd_len;
  507.         }
  508.       }
  509.     }
  510.     pthread_mutex_unlock(&mutex_clients);
  511.   }
  512.   double elapsed = cur_time-start_time;
  513.  
  514.   // отключаем клиентов
  515.   pthread_mutex_lock(&mutex_clients);
  516.   for (int i=0; i < MAX_CLIENTS; i++)
  517.   {
  518.     if (clients[i].fd >= 0)
  519.       close(clients[i].fd);
  520.   }
  521.   pthread_mutex_unlock(&mutex_clients);
  522.  
  523.   // ждем завершения потоков
  524.   usleep(600000);
  525.  
  526.   for (int i=0; i < WORKER_THREAD_COUNT; i++)
  527.   {
  528.     void *res = NULL;
  529.     pthread_join(worker_threads[i].thread_id, &res);
  530.     pthread_mutex_destroy(&worker_threads[i].mutex);
  531.   }
  532.  
  533.   unsigned long long total_packets_received = 0;
  534.   unsigned long long total_bytes_received = 0;
  535.   // запускаем потоки клиентов
  536.   printf("thread id:\tbytes rcv\tpackets rcv\r\n");
  537.   for (int i=0; i < MAX_CLIENTS; i++)
  538.   {
  539.     void *res = NULL;
  540.     pthread_join(thread_ids[i], &res);
  541.     ThreadStats *stat = (ThreadStats *)res;
  542.     if (stat)
  543.     {
  544.       printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
  545.       total_bytes_received += stat->bytes_received;
  546.       total_packets_received += stat->packets_received;
  547.       delete stat;
  548.     }
  549.   }
  550.   printf("    TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
  551.   printf("\r\n");
  552.   printf("Elapsed time: %.3lf\r\n", elapsed);
  553.   printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
  554.  
  555.   pthread_mutex_destroy(&mutex_clients);
  556.   close(sock_srv);
  557.   return 0;
  558. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×