Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <errno.h>
- #include <ev.h>
- #include <fcntl.h>
- #include <pthread.h>
- #include <poll.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sysexits.h>
- #include <unistd.h>
- /*
- * Worker thread context
- */
- typedef struct _wrk_ctx
- {
- /* Outgoing queue socket */
- int out_queue;
- /* Incoming queue socket */
- int in_queue;
- } WRK_CTX;
- /*
- * Network thread context
- */
- typedef struct _net_ctx
- {
- /* Server listener */
- int sock;
- /* Worker context */
- WRK_CTX * wrk_ctx;
- } NET_CTX;
- /*
- * Client connection context
- */
- typedef struct _io_ctx
- {
- /* Network context */
- NET_CTX * net_ctx;
- /* I/O context */
- struct ev_loop * loop;
- /* Client socket */
- int sock;
- /* I/O context */
- struct ev_io io_queue;
- /* I/O context */
- struct ev_io io_net;
- /* Async watcher */
- struct ev_async async;
- /* Buffer */
- char buf[4096];
- /* Buffer length */
- int buf_len;
- } IO_CTX;
- /*
- * Async callback
- */
- static void async_callback(struct ev_loop * loop,
- struct ev_async * watcher,
- int rev)
- {
- IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
- if (rev & EV_ERROR) { fprintf(stderr, "async_callback/EV_ERROR\n"); return; }
- ev_async_stop(loop, watcher);
- ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_WRITE);
- ev_io_start(loop, &(io_ctx -> io_net));
- fprintf(stderr, "%p async_callback/work completed: %p\n", (void*)pthread_self(), (void *)io_ctx);
- }
- /*
- * Enqueue callbaxk
- */
- static void queue_callback(struct ev_loop * loop,
- struct ev_io * watcher,
- int rev)
- {
- IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
- if (rev & EV_ERROR) { fprintf(stderr, "queue_callback/EV_ERROR\n"); return; }
- if (rev & EV_WRITE)
- {
- int bytes;
- ev_io_stop(loop, watcher);
- ev_async_start(loop, &(io_ctx -> async));
- bytes = send(watcher -> fd, &io_ctx, sizeof(IO_CTX *), 0);
- if (bytes != sizeof(IO_CTX *))
- {
- close(watcher -> fd);
- free(io_ctx);
- return;
- }
- fprintf(stderr, "%p queue_callback/%p\n", (void*)pthread_self(), (void *)io_ctx);
- }
- }
- /*
- * Input/output callbaxk
- */
- static void client_callback(struct ev_loop * loop,
- struct ev_io * watcher,
- int rev)
- {
- IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
- if (rev & EV_ERROR) { fprintf(stderr, "client_callback/EV_ERROR\n"); return; }
- if (rev & EV_READ)
- {
- ev_io_stop(loop, watcher);
- io_ctx -> buf_len = recv(watcher -> fd, io_ctx -> buf, 4095, 0);
- fprintf(stderr, "%p client_callback/recv: %d\n", (void*)pthread_self(), io_ctx -> buf_len);
- if (io_ctx -> buf_len <= 0)
- {
- close(watcher -> fd);
- free(io_ctx);
- return;
- }
- ev_io_start(loop, &(io_ctx -> io_queue));
- }
- if (rev & EV_WRITE)
- {
- int bytes;
- ev_io_stop(loop, watcher);
- bytes = send(watcher -> fd, io_ctx -> buf, io_ctx -> buf_len, 0);
- fprintf(stderr, "%p client_callback/send: %d\n", (void*)pthread_self(), bytes);
- if (bytes <= 0)
- {
- close(watcher -> fd);
- free(io_ctx);
- return;
- }
- ev_io_set(watcher, watcher -> fd, EV_READ);
- ev_io_start(loop, watcher);
- }
- }
- /*
- * New connection callback
- */
- static void accept_callback(struct ev_loop * loop,
- struct ev_io * watcher,
- int rev)
- {
- struct sockaddr_in sock_addr;
- socklen_t sock_addr_len = sizeof(sock_addr);
- int client_sock;
- char client_ip[16];
- IO_CTX * io_ctx = NULL;
- int tmp;
- int rc;
- if (rev & EV_ERROR) { fprintf(stderr, "accept_callback/EV_ERROR\n"); return; }
- memset(&sock_addr, 0, sock_addr_len);
- client_sock = accept(watcher -> fd, (struct sockaddr *)&sock_addr, &sock_addr_len);
- if (client_sock == -1) { fprintf(stderr, "accept_callback/accept\n"); return; }
- if (inet_ntop(AF_INET, &sock_addr.sin_addr, client_ip, 16) == NULL) { close(client_sock); return; }
- tmp = 1;
- rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
- if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return; }
- tmp = 1;
- rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
- if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return; }
- rc = fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL, 0) | O_NONBLOCK);
- if (rc == -1) { close(client_sock); fprintf(stderr, "fcntl/O_NONBLOCK\n"); return; }
- fprintf(stderr, "accept_callback/accept: %s:%d\n", client_ip, ntohs(sock_addr.sin_port));
- io_ctx = (IO_CTX *)malloc(sizeof(IO_CTX));
- memset(io_ctx, 0, sizeof(IO_CTX));
- io_ctx -> net_ctx = (NET_CTX *)(watcher -> data);
- io_ctx -> loop = loop;
- io_ctx -> sock = client_sock;
- ev_async_init(&(io_ctx -> async), async_callback);
- io_ctx -> async.data = io_ctx;
- ev_init(&(io_ctx -> io_queue), queue_callback);
- ev_io_set(&(io_ctx -> io_queue), io_ctx -> net_ctx -> wrk_ctx -> out_queue, EV_WRITE);
- io_ctx -> io_queue.data = io_ctx;
- ev_init(&(io_ctx -> io_net), client_callback);
- ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_READ);
- io_ctx -> io_net.data = io_ctx;
- ev_io_start(loop, &(io_ctx -> io_net));
- }
- /*
- * Network thread function
- */
- static void * network_thr_func(void * thr_ctx)
- {
- ev_io srv_io;
- NET_CTX * net_ctx = (NET_CTX *)thr_ctx;
- struct ev_loop * loop = ev_loop_new(EVFLAG_AUTO);
- ev_init(&srv_io, accept_callback);
- ev_io_set(&srv_io, net_ctx -> sock, EV_READ);
- srv_io.data = (NET_CTX *)thr_ctx;
- ev_io_start(loop, &srv_io);
- ev_loop(loop, 0);
- ev_loop_destroy(loop);
- return NULL;
- }
- /*
- * Worker thread function
- */
- static void * worker_thr_func(void * thr_ctx)
- {
- WRK_CTX * wrk_ctx = (WRK_CTX *)thr_ctx;
- IO_CTX * io_ctx = NULL;
- struct pollfd fds;
- int bytes;
- int rc;
- fds.fd = wrk_ctx -> in_queue;
- fds.events = POLLIN;
- for(;;)
- {
- fds.revents = 0;
- rc = poll(&fds, 1, 1000);
- if (rc < 0)
- {
- fprintf(stderr, "%p Error: %d", (void*)pthread_self(), errno);
- return NULL;
- }
- if (rc == 0) { /* fprintf(stderr, "Timeout\n"); */ }
- else
- {
- bytes = recv(fds.fd, &io_ctx, sizeof(IO_CTX *), 0);
- fprintf(stderr, "%p worker_thr/recv: %d\n", (void*)pthread_self(), bytes);
- /* Impossible happened? */
- if (bytes != sizeof(IO_CTX *)) { return NULL; }
- fprintf(stderr, "From queue %p\n", (void*)io_ctx);
- /* Do a big slow work here //////// */
- do
- {
- int pos = 0;
- fprintf(stderr, "%p Handler: %d `%.*s`\n", (void*)pthread_self(), io_ctx -> buf_len, io_ctx -> buf_len, io_ctx -> buf);
- for (pos = 0; pos < io_ctx -> buf_len; pos += 2)
- {
- if (io_ctx -> buf[pos] > ' ') { io_ctx -> buf[pos] = '_'; }
- }
- sleep(1);
- }
- while(0);
- /* /////////////////////////////// */
- /* Send notification to the network thread */
- ev_async_send(io_ctx -> loop, &(io_ctx -> async));
- }
- }
- return NULL;
- }
- static int make_sock()
- {
- struct sockaddr_in sock_addr;
- int tmp;
- int rc;
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock == -1) { fprintf(stderr, "socket\n"); return EX_SOFTWARE; }
- sock_addr.sin_family = AF_INET;
- sock_addr.sin_port = htons(7788);
- sock_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- tmp = 1;
- rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
- if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return -1; }
- tmp = 1;
- rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
- if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return -1; }
- rc = fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | FD_CLOEXEC | O_NONBLOCK);
- if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
- rc = bind(sock, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in));
- if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
- rc = listen(sock, 128);
- if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
- return sock;
- }
- #ifndef C_NETWORK_THR
- #define C_NETWORK_THR 2
- #endif
- #ifndef C_WORKER_THR
- #define C_WORKER_THR 5
- #endif
- int main(int argc, char **argv)
- {
- int sockpair[2];
- int thr;
- WRK_CTX wrk_ctx;
- NET_CTX net_ctx;
- pthread_t worker_thr[C_WORKER_THR];
- pthread_t network_thr[C_NETWORK_THR];
- if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockpair) == -1) { fprintf(stderr, "socketpair\n"); return EX_SOFTWARE; }
- wrk_ctx.out_queue = sockpair[0];
- wrk_ctx.in_queue = sockpair[1];
- net_ctx.wrk_ctx = &wrk_ctx;
- net_ctx.sock = make_sock();
- if (net_ctx.sock == -1) { return EX_SOFTWARE; }
- /* Worker threads */
- for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_create(&worker_thr[thr], NULL, worker_thr_func, &wrk_ctx); }
- /* Network threads */
- for(thr= 0; thr < C_NETWORK_THR; ++thr) { pthread_create(&network_thr[thr], NULL, network_thr_func, &net_ctx); }
- for(thr = 0; thr < C_NETWORK_THR; ++thr) { pthread_join(network_thr[thr], NULL); }
- for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_join(worker_thr[thr], NULL); }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement