Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "zmq.h"
- #include <time.h>
- #include <signal.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <math.h>
- #include <pthread.h>
- #include <sys/wait.h>
- #define SOCKET_STRING "tcp://127.0.0.1:1102"
- #define INTERNAL_WORKER_ADDRESS "inproc://workers"
- #define PACKET_SIZE 1024
- #define OUTPUT_BUFFER_SIZE PACKET_SIZE*5
- #define MAX_CLIENTS 10
- #define TEST_DURATION 10
- #define WORKER_THREAD_COUNT 2
- #define WORKER_WORK_TIME 2000
- struct ThreadStats
- {
- unsigned long long packets_received;
- unsigned long long bytes_received;
- };
- bool flag_kill;
- void *z_ctx;
- //---------------------------------------------------------------------------
- static void *client_thread(void *)
- {
- void *sock = zmq_socket(z_ctx, ZMQ_DEALER);
- if (!sock)
- return NULL;
- if (zmq_connect(sock, SOCKET_STRING) < 0)
- {
- zmq_close(sock);
- return NULL;
- }
- char output_buffer[OUTPUT_BUFFER_SIZE];
- int output_buffer_len = 0;
- // отправляем на сервер несколько пакетов
- while (output_buffer_len+PACKET_SIZE <= sizeof(output_buffer))
- {
- int bytes_sent = zmq_send(sock, output_buffer+output_buffer_len, PACKET_SIZE, ZMQ_DONTWAIT);
- output_buffer_len += bytes_sent;
- }
- ThreadStats *stats = new ThreadStats;
- stats->packets_received = 0;
- stats->bytes_received = 0;
- // начинаем мониторить события сокета
- zmq_pollitem_t poll_fd;
- poll_fd.socket = sock;
- poll_fd.events = ZMQ_POLLIN;
- poll_fd.revents = 0;
- while(!flag_kill)
- {
- int res = zmq_poll(&poll_fd, 1, 500);
- if (res < 0)
- break;
- if (res == 0)
- continue;
- // есть новый пакет
- if (poll_fd.revents & ZMQ_POLLIN)
- {
- poll_fd.revents &= ~ZMQ_POLLIN;
- // читаем пакет с данными
- zmq_msg_t rcv_msg;
- if (zmq_msg_init(&rcv_msg) < 0)
- {
- flag_kill = true;
- break;
- }
- int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
- if (rc < 0)
- {
- flag_kill = true;
- break;
- }
- int msg_size = zmq_msg_size(&rcv_msg);
- char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
- stats->bytes_received += msg_size;
- stats->packets_received++;
- // отправляем пакет с данными
- if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
- {
- printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
- break;
- }
- zmq_msg_close(&rcv_msg);
- }
- }
- zmq_close(sock);
- return stats;
- }
- //---------------------------------------------------------------------------
- static void *worker_thread(void *)
- {
- void *sock = zmq_socket(z_ctx, ZMQ_REP);
- if (!sock)
- return NULL;
- if (zmq_connect(sock, INTERNAL_WORKER_ADDRESS) < 0)
- {
- zmq_close(sock);
- return NULL;
- }
- // начинаем мониторить события сокета
- zmq_pollitem_t poll_fd;
- poll_fd.socket = sock;
- poll_fd.events = ZMQ_POLLIN;
- poll_fd.revents = 0;
- while(!flag_kill)
- {
- int res = zmq_poll(&poll_fd, 1, 500);
- if (res < 0)
- break;
- if (res == 0)
- continue;
- // есть новый пакет
- if (poll_fd.revents & ZMQ_POLLIN)
- {
- poll_fd.revents &= ~ZMQ_POLLIN;
- // читаем фрейм с идентификатором клиента
- char client_identity[255];
- int client_identity_len;
- client_identity_len = zmq_recv(sock, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
- if (client_identity_len <= 0)
- break;
- // читаем пакет с данными
- zmq_msg_t rcv_msg;
- if (zmq_msg_init(&rcv_msg) < 0)
- {
- flag_kill = true;
- break;
- }
- int rc = zmq_msg_recv(&rcv_msg, sock, ZMQ_DONTWAIT);
- if (rc < 0)
- {
- flag_kill = true;
- break;
- }
- int msg_size = zmq_msg_size(&rcv_msg);
- char *msg_buffer = (char *)zmq_msg_data(&rcv_msg);
- // имитируем обработку запроса (2ms)
- usleep(WORKER_WORK_TIME);
- // отправляем фрейм с идентификатором клиента
- if (zmq_send(sock, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
- break;
- // отправляем пакет с данными обратно
- if (zmq_send(sock, msg_buffer, msg_size, ZMQ_DONTWAIT) < 0)
- {
- printf("zmq_send() error: %s\r\n", zmq_strerror(errno));
- break;
- }
- zmq_msg_close(&rcv_msg);
- }
- }
- zmq_close(sock);
- return NULL;
- }
- //---------------------------------------------------------------------------
- void termination_handler(int)
- {
- flag_kill = true;
- }
- //---------------------------------------------------------------------------
- int main(int argc, char *argv[])
- {
- signal(SIGTERM, termination_handler);
- signal(SIGSTOP, termination_handler);
- signal(SIGINT, termination_handler);
- signal(SIGQUIT, termination_handler);
- z_ctx = zmq_ctx_new();
- if (!z_ctx)
- return 1;
- // zmq_ctx_set(z_ctx, ZMQ_IO_THREADS, 2);
- // стартуем сокет для обслуживания клиентов
- void *sock_srv = zmq_socket(z_ctx, ZMQ_ROUTER);
- if (!sock_srv)
- {
- zmq_ctx_destroy(z_ctx);
- return 2;
- }
- if (zmq_bind(sock_srv, SOCKET_STRING) < 0)
- {
- zmq_close(sock_srv);
- zmq_ctx_destroy(z_ctx);
- return 3;
- }
- // стартуем сокет для обслуживания потоков worker-ов
- void *sock_workers = zmq_socket(z_ctx, ZMQ_DEALER);
- if (!sock_workers)
- {
- zmq_close(sock_srv);
- zmq_ctx_destroy(z_ctx);
- return 2;
- }
- if (zmq_bind(sock_workers, INTERNAL_WORKER_ADDRESS) < 0)
- {
- zmq_close(sock_srv);
- zmq_close(sock_workers);
- zmq_ctx_destroy(z_ctx);
- return 3;
- }
- // запускаем потоки worker-ов
- pthread_t worker_thread_ids[WORKER_THREAD_COUNT];
- for (int i=0; i < WORKER_THREAD_COUNT; i++)
- {
- pthread_create(&worker_thread_ids[i], NULL, &worker_thread, NULL);
- }
- // запускаем потоки клиентов
- pthread_t thread_ids[MAX_CLIENTS];
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- pthread_create(&thread_ids[i], NULL, &client_thread, NULL);
- }
- // начинаем мониторить события сокета
- zmq_pollitem_t poll_fds[2];
- poll_fds[0].socket = sock_srv;
- poll_fds[0].events = ZMQ_POLLIN;
- poll_fds[0].revents = 0;
- poll_fds[1].socket = sock_workers;
- poll_fds[1].events = ZMQ_POLLIN;
- poll_fds[1].revents = 0;
- struct timespec ts_start;
- struct timespec ts_current;
- clock_gettime(CLOCK_MONOTONIC, &ts_start);
- double start_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
- double cur_time = ts_start.tv_sec + (double)ts_start.tv_nsec/1000000000;
- zmq_msg_t rcv_msg;
- while(!flag_kill)
- {
- clock_gettime(CLOCK_MONOTONIC, &ts_current);
- cur_time = ts_current.tv_sec + (double)ts_current.tv_nsec/1000000000;
- if (cur_time-start_time > TEST_DURATION)
- {
- flag_kill = true;
- break;
- }
- int res = zmq_poll(poll_fds, 2, 500);
- if (res < 0)
- break;
- if (res == 0)
- continue;
- // есть новый пакет от клиента
- if (poll_fds[0].revents & ZMQ_POLLIN)
- {
- poll_fds[0].revents &= ~ZMQ_POLLIN;
- // читаем фрейм с идентификатором клиента
- char client_identity[255];
- int client_identity_len;
- client_identity_len = zmq_recv(sock_srv, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
- if (client_identity_len <= 0)
- break;
- // читаем фрейм с пакетом данных
- if (zmq_msg_init(&rcv_msg) < 0)
- {
- flag_kill = true;
- break;
- }
- int rc = zmq_msg_recv(&rcv_msg, sock_srv, ZMQ_DONTWAIT);
- if (rc < 0)
- {
- flag_kill = true;
- break;
- }
- int msg_size = zmq_msg_size(&rcv_msg);
- // отправляем delimiter-фрейм worker-у
- if (zmq_send(sock_workers, "", 0, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
- break;
- // отправляем фрейм с идентификатором клиента worker-у
- // worker нам его вернет обратно вместе с ответом на запрос
- // и мы будем знать какому клиенту направить ответ
- if (zmq_send(sock_workers, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
- break;
- // отправляем фрейм с пакетом данных worker-у
- if (zmq_send(sock_workers, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
- break;
- zmq_msg_close(&rcv_msg);
- }
- // есть новый пакет от worker-а
- if (poll_fds[1].revents & ZMQ_POLLIN)
- {
- poll_fds[1].revents &= ~ZMQ_POLLIN;
- // читаем delimiter-фрейм
- char delimiter_buf[2];
- int delimiter_buf_len;
- delimiter_buf_len = zmq_recv(sock_workers, delimiter_buf, sizeof(delimiter_buf), ZMQ_DONTWAIT);
- if (delimiter_buf_len > 0)
- break;
- // читаем фрейм с идентификатором клиента
- char client_identity[255];
- int client_identity_len;
- client_identity_len = zmq_recv(sock_workers, client_identity, sizeof(client_identity), ZMQ_DONTWAIT);
- if (client_identity_len <= 0)
- break;
- // читаем фрейм с пакетом данных
- if (zmq_msg_init(&rcv_msg) < 0)
- {
- flag_kill = true;
- break;
- }
- int rc = zmq_msg_recv(&rcv_msg, sock_workers, ZMQ_DONTWAIT);
- if (rc < 0)
- {
- flag_kill = true;
- break;
- }
- int msg_size = zmq_msg_size(&rcv_msg);
- // отправляем фрейм с идентификатором клиента ROUTER-сокету
- if (zmq_send(sock_srv, client_identity, client_identity_len, ZMQ_DONTWAIT | ZMQ_SNDMORE) < 0)
- break;
- // отправляем фрейм с пакетом данных клиенту
- if (zmq_send(sock_srv, zmq_msg_data(&rcv_msg), msg_size, ZMQ_DONTWAIT) < 0)
- break;
- zmq_msg_close(&rcv_msg);
- }
- }
- double elapsed = cur_time-start_time;
- // ждем завершения потоков
- usleep(600000);
- for (int i=0; i < WORKER_THREAD_COUNT; i++)
- {
- void *res = NULL;
- pthread_join(worker_thread_ids[i], &res);
- }
- unsigned long long total_packets_received = 0;
- unsigned long long total_bytes_received = 0;
- // получаем статистику по потокам и считаем итог
- printf("thread id:\tbytes rcv\tpackets rcv\r\n");
- for (int i=0; i < MAX_CLIENTS; i++)
- {
- void *res = NULL;
- pthread_join(thread_ids[i], &res);
- ThreadStats *stat = (ThreadStats *)res;
- if (stat)
- {
- printf("thread %02d:\t%lld\t%lld\r\n", i+1, stat->bytes_received, stat->packets_received);
- total_bytes_received += stat->bytes_received;
- total_packets_received += stat->packets_received;
- delete stat;
- }
- }
- printf(" TOTAL:\t%lld\t%lld\r\n", total_bytes_received, total_packets_received);
- printf("\r\n");
- printf("Elapsed time: %.3lf\r\n", elapsed);
- printf("Avg speed: %d bytes/s, %d packets/s\r\n", (unsigned int)(total_bytes_received/elapsed), (unsigned int)(total_packets_received/elapsed));
- zmq_close(sock_workers);
- zmq_close(sock_srv);
- zmq_ctx_destroy(z_ctx);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement