Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <signal.h>
- #include <stdio.h>
- #include <math.h>
- #include <string.h>
- #include <stdlib.h>
- #include <time.h>
- #include <malloc.h>
- #include <fcntl.h>
- #include <syslog.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <netdb.h>
- #include <pthread.h>
- #include <sys/poll.h>
- #define SOCKET_TCP_PORT 1101
- #define PACKET_SIZE 1024
- #define INPUT_BUFFER_SIZE (PACKET_SIZE+sizeof(int))*5
- #define OUTPUT_BUFFER_SIZE (PACKET_SIZE+sizeof(int))*5
- #define MAX_CLIENTS 10
- #define TEST_DURATION 10
- #define WORKER_THREAD_COUNT 2
- #define WORKER_WORK_TIME 2000
- #define WORKER_SLEEP_TIME 1
- struct SocketClient
- {
- char input_buffer[INPUT_BUFFER_SIZE];
- char output_buffer[OUTPUT_BUFFER_SIZE];
- int input_buffer_len;
- int output_buffer_len;
- int fd;
- int poll_fd_index;
- };
- struct ThreadStats
- {
- unsigned long long packets_received;
- unsigned long long bytes_received;
- };
- struct WorkerThreadData
- {
- pthread_t thread_id;
- pthread_mutex_t mutex;
- char packet_buffer[INPUT_BUFFER_SIZE];
- int packet_buffer_len;
- int client_index;
- };
- struct SocketClient clients[MAX_CLIENTS];
- pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
- WorkerThreadData worker_threads[WORKER_THREAD_COUNT];
- bool flag_kill;
- //---------------------------------------------------------------------------
- bool setnonblocking(int fd)
- {
- int opts = fcntl(fd, F_GETFL);
- if (opts < 0)
- return false;
- opts = (opts | O_NONBLOCK);
- if (fcntl(fd, F_SETFL, opts) < 0)
- return false;
- return true;
- }
- //---------------------------------------------------------------------------
- static void *client_thread(void *)
- {
- // устанавливаем соединение с сервером
- struct hostent *host_info = gethostbyname("127.0.0.1");
- if (!host_info)
- return NULL;
- int fd = socket(AF_INET, SOCK_STREAM, 0);
- if (fd < 0)
- return NULL;
- struct sockaddr_in addr;
- memset((char *) &addr, 0, sizeof(addr));
- addr.sin_family = host_info->h_addrtype;
- addr.sin_port = htons(SOCKET_TCP_PORT);
- addr.sin_addr = *(struct in_addr*)host_info->h_addr;
- if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
- {
- close(fd);
- return NULL;
- }
- if (!setnonblocking(fd))
- {
- close(fd);
- return NULL;
- }
- // начинаем мониторить события сокета
- struct pollfd poll_fd;
- memset(&poll_fd, 0, sizeof(poll_fd));
- poll_fd.fd = fd;
- poll_fd.events = POLLIN | POLLOUT | POLLERR | POLLHUP;
- poll_fd.revents = 0;
- char input_buffer[INPUT_BUFFER_SIZE];
- char output_buffer[OUTPUT_BUFFER_SIZE];
- int input_buffer_len = 0;
- int output_buffer_len = 0;
- // кладем в буфер на передачу несколько пакетов
- while (output_buffer_len+sizeof(int)+PACKET_SIZE <= sizeof(output_buffer))
- {
- *((int *)(output_buffer+output_buffer_len)) = PACKET_SIZE;
- output_buffer_len += sizeof(int)+PACKET_SIZE;
- }
- ThreadStats *stats = new ThreadStats;
- stats->bytes_received = 0;
- stats->packets_received = 0;
- while(!flag_kill)
- {
- // если есть что передавать - мониторим разрешение на передачу
- if (output_buffer_len > 0)
- poll_fd.events |= POLLOUT;
- else
- poll_fd.events &= ~POLLOUT;
- int res = poll(&poll_fd, 1, 500);
- if (res < 0)
- break;
- if (res == 0)
- continue;
- // ошибочка
- if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP))
- break;
- // есть данные для чтения из сокета
- if (poll_fd.revents & POLLIN)
- {
- int rcv_len = recv(fd, input_buffer+input_buffer_len, sizeof(input_buffer)-input_buffer_len, 0);
- if (rcv_len <= 0)
- break;
- stats->bytes_received += rcv_len;
- input_buffer_len += rcv_len;
- // парсинг буфера приема
- while(input_buffer_len > sizeof(int))
- {
- int packet_size = *((int *)input_buffer);
- if (input_buffer_len < sizeof(int)+packet_size)
- break;
- if (output_buffer_len+sizeof(int)+packet_size > sizeof(output_buffer))
- break;
- stats->packets_received++;
- // если получили корректный пакет и можем его впихнуть в буфер на передачу - отправляем его туда
- memcpy(output_buffer+output_buffer_len, input_buffer, sizeof(int)+packet_size);
- output_buffer_len += sizeof(int)+packet_size;
- // а из буфера приема удаляем
- memmove(input_buffer, input_buffer+sizeof(int)+packet_size, input_buffer_len-sizeof(int)-packet_size);
- input_buffer_len -= sizeof(int)+packet_size;
- }
- poll_fd.revents &= ~POLLIN;
- }
- // можно записать данные в сокет
- if (poll_fd.revents & POLLOUT)
- {
- int snd_len = send(fd, output_buffer, output_buffer_len, 0);
- if (snd_len <= 0)
- break;
- // удаляем отправленные данные из буфера
- memmove(output_buffer, output_buffer+snd_len, output_buffer_len-snd_len);
- output_buffer_len -= snd_len;
- poll_fd.revents &= ~POLLOUT;
- }
- }
- close(fd);
- return stats;
- }
- //---------------------------------------------------------------------------
- static void *worker_thread(void *arg)
- {
- WorkerThreadData *data = (WorkerThreadData *)arg;
- while (!flag_kill)
- {
- if (data->client_index >= 0)
- {
- pthread_mutex_lock(&data->mutex);
- // имитируем обработку запроса (2ms)
- usleep(WORKER_WORK_TIME);
- // пытаемся записать пакет в буфер на передачу клиенту
- while (!flag_kill)
- {
- pthread_mutex_lock(&mutex_clients);
- if (clients[data->client_index].fd < 0)
- {
- pthread_mutex_unlock(&mutex_clients);
- break;
- }
- if (clients[data->client_index].output_buffer_len+data->packet_buffer_len < sizeof(clients[data->client_index].output_buffer))
- {
- memcpy(clients[data->client_index].output_buffer+clients[data->client_index].output_buffer_len,
- data->packet_buffer, data->packet_buffer_len);
- clients[data->client_index].output_buffer_len += data->packet_buffer_len;
- pthread_mutex_unlock(&mutex_clients);
- break;
- }
- else
- {
- pthread_mutex_unlock(&mutex_clients);
- usleep(WORKER_SLEEP_TIME);
- }
- }
- data->client_index = -1;
- pthread_mutex_unlock(&data->mutex);
- }
- else
- usleep(WORKER_SLEEP_TIME);
- }
- return NULL;
- }
- //---------------------------------------------------------------------------
- void termination_handler(int)
- {
- flag_kill = true;
- }
- //---------------------------------------------------------------------------
- bool client_parse_input_buffer(int client_index)
- {
- while(clients[client_index].input_buffer_len > sizeof(int))
- {
- int packet_size = *((int *)clients[client_index].input_buffer);
- if (clients[client_index].input_buffer_len < sizeof(int)+packet_size)
- break;
- if (clients[client_index].output_buffer_len+sizeof(int)+packet_size > sizeof(clients[client_index].output_buffer))
- break;
- // если получили корректный пакет
- // ищем свободного worker-а
- int worker_index = -1;
- for (int i=0; i < WORKER_THREAD_COUNT; i++)
- {
- if (worker_threads[i].client_index < 0)
- {
- worker_index = i;
- break;
- }
- }
- // если нет свободного worker-а - выходим
- if (worker_index < 0)
- return false;
- // записываем пакет на обработку в буфер worker-а
- pthread_mutex_lock(&worker_threads[worker_index].mutex);
- memcpy(worker_threads[worker_index].packet_buffer, clients[client_index].input_buffer, sizeof(int)+packet_size);
- worker_threads[worker_index].packet_buffer_len = sizeof(int)+packet_size;
- worker_threads[worker_index].client_index = client_index;
- pthread_mutex_unlock(&worker_threads[worker_index].mutex);
- // а из буфера приема удаляем
- memmove(clients[client_index].input_buffer, clients[client_index].input_buffer+sizeof(int)+packet_size, clients[client_index].input_buffer_len-sizeof(int)-packet_size);
- clients[client_index].input_buffer_len -= sizeof(int)+packet_size;
- }
- return true;
- }
- //---------------------------------------------------------------------------
- void client_disconnected(int client_index, struct pollfd *poll_fds, int &n_fds)
- {
- close(clients[client_index].fd);
- clients[client_index].fd = -1;
- clients[client_index].input_buffer_len = 0;
- clients[client_index].output_buffer_len = 0;
- if (clients[client_index].poll_fd_index >= 0 && clients[client_index].poll_fd_index < n_fds-1)
- {
- memmove(poll_fds+clients[client_index].poll_fd_index,
- poll_fds+clients[client_index].poll_fd_index+1,
- (n_fds-clients[client_index].poll_fd_index-1)*sizeof(struct pollfd));
- }
- n_fds--;
- }
- //---------------------------------------------------------------------------
- int main(int argc, char *argv[])
- {
- signal(SIGTERM, termination_handler);
- signal(SIGSTOP, termination_handler);
- signal(SIGINT, termination_handler);
- signal(SIGQUIT, termination_handler);
- int sock_srv = socket(AF_INET, SOCK_STREAM, 0);
- if (sock_srv < 0)
- return 1;
- // устанавливаем опцию для повторного использования адреса (порта) без ожидания таймаута
- int reuse_addr = 1;
- if (setsockopt(sock_srv, SOL_SOCKET, SO_REUSEADDR, &reuse_addr, sizeof(reuse_addr)) < 0)
- return 2;
- // устанавливаем неблокирующий режим
- if (!setnonblocking(sock_srv))
- return 3;
- struct sockaddr_in addr;
- memset((char *) &addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(SOCKET_TCP_PORT);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
- // привязываем сокет к адресу (порту)
- if (bind(sock_srv, (struct sockaddr *)&addr, sizeof(addr)) < 0)
- return 4;
- // начинаем слушать порт
- if (listen(sock_srv, 100) < 0)
- return 5;
- // запускаем потоки worker-ов
- for (int i=0; i < WORKER_THREAD_COUNT; i++)
- {
- worker_threads[i].client_index = -1;
- worker_threads[i].packet_buffer_len = 0;
- pthread_mutex_init(&worker_threads[i].mutex, NULL);
- pthread_create(&worker_threads[i].thread_id, NULL, &worker_thread, worker_threads+i);
- }
- // запускаем потоки клиентов
- pthread_t thread_ids[MAX_CLIENTS];
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
- }
- // чистим массив с данными клиентов
- pthread_mutex_lock(&mutex_clients);
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- clients[i].fd = -1;
- clients[i].input_buffer_len = 0;
- clients[i].output_buffer_len = 0;
- clients[i].poll_fd_index = -1;
- }
- pthread_mutex_unlock(&mutex_clients);
- // начинаем мониторить события сокета(ов)
- // по мере подключения клиентов n_fds будет возрастать, poll_fds заполняться
- // а отключение первого же клиента будет означать останов программы,
- // поэтому мы не будем заботиться о сдвиге poll_fds при отключении клиента
- struct pollfd poll_fds[MAX_CLIENTS+1];
- memset(&poll_fds, 0, sizeof(pollfd)*(MAX_CLIENTS+1));
- poll_fds[0].fd = sock_srv;
- poll_fds[0].events = POLLIN;
- poll_fds[0].revents = 0;
- int n_fds = 1;
- struct timespec ts_start;
- struct timespec ts_current;
- clock_gettime(CLOCK_MONOTONIC, &ts_start);
- double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
- double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
- bool worker_available = true;
- int last_client_index = 0;
- while(!flag_kill)
- {
- clock_gettime(CLOCK_MONOTONIC, &ts_current);
- cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
- if (cur_time-start_time > TEST_DURATION)
- {
- flag_kill = true;
- break;
- }
- // если есть что передавать - мониторим разрешение на передачу
- // если можем читать - мониторим появление данных для чтения
- pthread_mutex_lock(&mutex_clients);
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- if (clients[i].fd >= 0)
- {
- if (clients[i].output_buffer_len > 0)
- poll_fds[clients[i].poll_fd_index].events |= POLLOUT;
- else
- poll_fds[clients[i].poll_fd_index].events &= ~POLLOUT;
- if (clients[i].input_buffer_len < sizeof(clients[i].input_buffer_len))
- poll_fds[clients[i].poll_fd_index].events |= POLLIN;
- else
- poll_fds[clients[i].poll_fd_index].events &= ~POLLIN;
- }
- }
- pthread_mutex_unlock(&mutex_clients);
- int res = poll(poll_fds, n_fds, worker_available ? 500 : 1);
- if (res < 0)
- break;
- if (res == 0)
- {
- // если все worker-ы были заняты в предыдущий цикл - проверяем, не освободились ли
- if (!worker_available)
- {
- last_client_index++;
- if (last_client_index >= MAX_CLIENTS)
- last_client_index = 0;
- pthread_mutex_lock(&mutex_clients);
- for (int i=last_client_index; i < MAX_CLIENTS; i++)
- {
- if (clients[i].fd < 0)
- continue;
- worker_available = client_parse_input_buffer(i);
- if (!worker_available)
- break;
- }
- pthread_mutex_unlock(&mutex_clients);
- }
- continue;
- }
- // есть новое подключение клиента
- if (poll_fds[0].revents & POLLIN)
- {
- poll_fds[0].revents &= ~POLLIN;
- socklen_t len = sizeof(struct sockaddr_in);
- struct sockaddr_in addr;
- memset(&addr, 0, len);
- int sock = accept(sock_srv, (struct sockaddr *)&addr, &len);
- if (sock < 0)
- break;
- if (!setnonblocking(sock))
- break;
- // если слишком много клиентов - закрываем новое соединение
- if (n_fds > MAX_CLIENTS)
- {
- close(sock);
- }
- else
- {
- int client_index = -1;
- pthread_mutex_lock(&mutex_clients);
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- if (clients[i].fd < 0)
- {
- client_index = i;
- break;
- }
- }
- if (client_index >= 0)
- {
- clients[client_index].fd = sock;
- clients[client_index].input_buffer_len = 0;
- clients[client_index].output_buffer_len = 0;
- clients[client_index].poll_fd_index = n_fds;
- poll_fds[n_fds].fd = sock;
- poll_fds[n_fds].events = POLLIN | POLLERR | POLLHUP;
- poll_fds[n_fds].revents = 0;
- n_fds++;
- }
- else
- close(sock);
- pthread_mutex_unlock(&mutex_clients);
- }
- }
- pthread_mutex_lock(&mutex_clients);
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- if (clients[i].fd >= 0)
- {
- if ((poll_fds[clients[i].poll_fd_index].revents & POLLERR) || (poll_fds[clients[i].poll_fd_index].revents & POLLHUP))
- {
- client_disconnected(i, poll_fds, n_fds);
- continue;
- }
- // если есть что читать от клиента
- if (poll_fds[clients[i].poll_fd_index].revents & POLLIN)
- {
- poll_fds[clients[i].poll_fd_index].revents &= ~POLLIN;
- int rcv_len = recv(clients[i].fd, clients[i].input_buffer+clients[i].input_buffer_len, sizeof(clients[i].input_buffer)-clients[i].input_buffer_len, 0);
- if (rcv_len <= 0)
- {
- client_disconnected(i, poll_fds, n_fds);
- continue;
- }
- clients[i].input_buffer_len += rcv_len;
- // парсинг буфера приема
- worker_available = client_parse_input_buffer(i);
- if (!worker_available)
- last_client_index = i;
- }
- // если можно записать данные в сокет
- if (poll_fds[clients[i].poll_fd_index].revents & POLLOUT)
- {
- poll_fds[clients[i].poll_fd_index].revents &= ~POLLOUT;
- int snd_len = send(clients[i].fd, clients[i].output_buffer, clients[i].output_buffer_len, 0);
- if (snd_len <= 0)
- {
- client_disconnected(i, poll_fds, n_fds);
- continue;
- }
- // удаляем отправленные данные из буфера
- memmove(clients[i].output_buffer, clients[i].output_buffer+snd_len, clients[i].output_buffer_len-snd_len);
- clients[i].output_buffer_len -= snd_len;
- }
- }
- }
- pthread_mutex_unlock(&mutex_clients);
- }
- double elapsed = cur_time-start_time;
- // отключаем клиентов
- pthread_mutex_lock(&mutex_clients);
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- if (clients[i].fd >= 0)
- close(clients[i].fd);
- }
- pthread_mutex_unlock(&mutex_clients);
- // ждем завершения потоков
- usleep(600000);
- for (int i=0; i < WORKER_THREAD_COUNT; i++)
- {
- void *res = NULL;
- pthread_join(worker_threads[i].thread_id, &res);
- pthread_mutex_destroy(&worker_threads[i].mutex);
- }
- unsigned long long total_packets_received = 0;
- unsigned long long total_bytes_received = 0;
- // запускаем потоки клиентов
- printf("thread id:\tbytes rcv\tpackets rcv\r\n");
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- void *res = NULL;
- pthread_join(thread_ids[i], &res);
- ThreadStats *stat = (ThreadStats *)res;
- if (stat)
- {
- printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
- total_bytes_received += stat->bytes_received;
- total_packets_received += stat->packets_received;
- delete stat;
- }
- }
- printf(" TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
- printf("\r\n");
- printf("Elapsed time: %.3lf\r\n", elapsed);
- printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
- pthread_mutex_destroy(&mutex_clients);
- close(sock_srv);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement