Advertisement
Guest User

Untitled

a guest
Jun 23rd, 2016
1,514
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 9.43 KB | None | 0 0
  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. #include <arpa/inet.h>
  4. #include <errno.h>
  5. #include <ev.h>
  6. #include <fcntl.h>
  7. #include <pthread.h>
  8. #include <poll.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sysexits.h>
  13. #include <unistd.h>
  14.  
  15. /*
  16.  * Worker thread context
  17.  */
  18. typedef struct _wrk_ctx
  19. {
  20.     /* Outgoing queue socket */
  21.     int       out_queue;
  22.     /* Incoming queue socket */
  23.     int       in_queue;
  24. } WRK_CTX;
  25.  
  26. /*
  27.  * Network thread context
  28.  */
  29. typedef struct _net_ctx
  30. {
  31.     /* Server listener */
  32.     int              sock;
  33.     /* Worker context */
  34.     WRK_CTX        * wrk_ctx;
  35. } NET_CTX;
  36.  
  37. /*
  38.  * Client connection context
  39.  */
  40. typedef struct _io_ctx
  41. {
  42.     /* Network context  */
  43.     NET_CTX          * net_ctx;
  44.     /* I/O context      */
  45.     struct ev_loop   * loop;
  46.     /* Client socket    */
  47.     int                sock;
  48.     /* I/O context      */
  49.     struct ev_io       io_queue;
  50.     /* I/O context      */
  51.     struct ev_io       io_net;
  52.     /* Async watcher    */
  53.     struct ev_async    async;
  54.     /* Buffer           */
  55.     char               buf[4096];
  56.     /* Buffer length    */
  57.     int                buf_len;
  58. } IO_CTX;
  59.  
  60. /*
  61.  * Async callback
  62.  */
  63. static void async_callback(struct ev_loop   * loop,
  64.                            struct ev_async  * watcher,
  65.                            int                rev)
  66. {
  67.     IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
  68.  
  69.     if (rev & EV_ERROR) { fprintf(stderr, "async_callback/EV_ERROR\n"); return; }
  70.  
  71.     ev_async_stop(loop, watcher);
  72.     ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_WRITE);
  73.     ev_io_start(loop, &(io_ctx -> io_net));
  74.  
  75.     fprintf(stderr, "%p async_callback/work completed: %p\n", (void*)pthread_self(), (void *)io_ctx);
  76. }
  77.  
  78. /*
  79.  * Enqueue callbaxk
  80.  */
  81. static void queue_callback(struct ev_loop  * loop,
  82.                             struct ev_io    * watcher,
  83.                             int               rev)
  84. {
  85.     IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
  86.  
  87.     if (rev & EV_ERROR) { fprintf(stderr, "queue_callback/EV_ERROR\n"); return; }
  88.  
  89.     if (rev & EV_WRITE)
  90.     {
  91.         int bytes;
  92.         ev_io_stop(loop, watcher);
  93.         ev_async_start(loop, &(io_ctx -> async));
  94.  
  95.         bytes = send(watcher -> fd, &io_ctx, sizeof(IO_CTX *), 0);
  96.         if (bytes != sizeof(IO_CTX *))
  97.         {
  98.             close(watcher -> fd);
  99.             free(io_ctx);
  100.             return;
  101.         }
  102.  
  103.         fprintf(stderr, "%p queue_callback/%p\n", (void*)pthread_self(), (void *)io_ctx);
  104.     }
  105. }
  106.  
  107. /*
  108.  * Input/output callbaxk
  109.  */
  110. static void client_callback(struct ev_loop  * loop,
  111.                             struct ev_io    * watcher,
  112.                             int               rev)
  113. {
  114.     IO_CTX * io_ctx = (IO_CTX *)(watcher -> data);
  115.  
  116.     if (rev & EV_ERROR) { fprintf(stderr, "client_callback/EV_ERROR\n"); return; }
  117.  
  118.     if (rev & EV_READ)
  119.     {
  120.         ev_io_stop(loop, watcher);
  121.  
  122.         io_ctx -> buf_len = recv(watcher -> fd, io_ctx -> buf, 4095, 0);
  123.         fprintf(stderr, "%p client_callback/recv: %d\n", (void*)pthread_self(), io_ctx -> buf_len);
  124.         if (io_ctx -> buf_len <= 0)
  125.         {
  126.             close(watcher -> fd);
  127.             free(io_ctx);
  128.             return;
  129.         }
  130.  
  131.         ev_io_start(loop, &(io_ctx -> io_queue));
  132.     }
  133.  
  134.     if (rev & EV_WRITE)
  135.     {
  136.         int bytes;
  137.         ev_io_stop(loop, watcher);
  138.  
  139.         bytes = send(watcher -> fd, io_ctx -> buf, io_ctx -> buf_len, 0);
  140.         fprintf(stderr, "%p client_callback/send: %d\n", (void*)pthread_self(), bytes);
  141.         if (bytes <= 0)
  142.         {
  143.             close(watcher -> fd);
  144.             free(io_ctx);
  145.             return;
  146.         }
  147.         ev_io_set(watcher, watcher -> fd, EV_READ);
  148.         ev_io_start(loop, watcher);
  149.     }
  150. }
  151.  
  152. /*
  153.  * New connection callback
  154.  */
  155. static void accept_callback(struct ev_loop  * loop,
  156.                             struct ev_io    * watcher,
  157.                             int               rev)
  158. {
  159.     struct sockaddr_in sock_addr;
  160.     socklen_t          sock_addr_len = sizeof(sock_addr);
  161.     int                client_sock;
  162.     char               client_ip[16];
  163.     IO_CTX           * io_ctx = NULL;
  164.     int tmp;
  165.     int rc;
  166.  
  167.     if (rev & EV_ERROR) { fprintf(stderr, "accept_callback/EV_ERROR\n"); return; }
  168.  
  169.     memset(&sock_addr, 0, sock_addr_len);
  170.  
  171.     client_sock = accept(watcher -> fd, (struct sockaddr *)&sock_addr, &sock_addr_len);
  172.     if (client_sock == -1) { fprintf(stderr, "accept_callback/accept\n"); return; }
  173.  
  174.     if (inet_ntop(AF_INET, &sock_addr.sin_addr, client_ip, 16) == NULL) { close(client_sock); return; }
  175.  
  176.     tmp = 1;
  177.     rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
  178.     if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return; }
  179.  
  180.     tmp = 1;
  181.     rc = setsockopt(client_sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
  182.     if (rc == -1) { close(client_sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return; }
  183.  
  184.     rc = fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL, 0) | O_NONBLOCK);
  185.     if (rc == -1) { close(client_sock); fprintf(stderr, "fcntl/O_NONBLOCK\n"); return; }
  186.  
  187.     fprintf(stderr, "accept_callback/accept: %s:%d\n", client_ip, ntohs(sock_addr.sin_port));
  188.  
  189.     io_ctx = (IO_CTX *)malloc(sizeof(IO_CTX));
  190.     memset(io_ctx, 0, sizeof(IO_CTX));
  191.  
  192.     io_ctx -> net_ctx = (NET_CTX *)(watcher -> data);
  193.     io_ctx -> loop    = loop;
  194.     io_ctx -> sock    = client_sock;
  195.  
  196.     ev_async_init(&(io_ctx -> async), async_callback);
  197.     io_ctx -> async.data = io_ctx;
  198.  
  199.     ev_init(&(io_ctx -> io_queue), queue_callback);
  200.     ev_io_set(&(io_ctx -> io_queue), io_ctx -> net_ctx -> wrk_ctx -> out_queue, EV_WRITE);
  201.     io_ctx -> io_queue.data = io_ctx;
  202.  
  203.     ev_init(&(io_ctx -> io_net), client_callback);
  204.     ev_io_set(&(io_ctx -> io_net), io_ctx -> sock, EV_READ);
  205.     io_ctx -> io_net.data = io_ctx;
  206.  
  207.     ev_io_start(loop, &(io_ctx -> io_net));
  208. }
  209.  
  210. /*
  211.  * Network thread function
  212.  */
  213. static void * network_thr_func(void  * thr_ctx)
  214. {
  215.     ev_io            srv_io;
  216.     NET_CTX        * net_ctx = (NET_CTX *)thr_ctx;
  217.     struct ev_loop * loop    = ev_loop_new(EVFLAG_AUTO);
  218.  
  219.     ev_init(&srv_io, accept_callback);
  220.     ev_io_set(&srv_io, net_ctx -> sock, EV_READ);
  221.     srv_io.data = (NET_CTX *)thr_ctx;
  222.     ev_io_start(loop, &srv_io);
  223.  
  224.     ev_loop(loop, 0);
  225.     ev_loop_destroy(loop);
  226.  
  227. return NULL;
  228. }
  229.  
  230. /*
  231.  * Worker thread function
  232.  */
  233. static void * worker_thr_func(void  * thr_ctx)
  234. {
  235.     WRK_CTX        * wrk_ctx = (WRK_CTX *)thr_ctx;
  236.     IO_CTX         * io_ctx  = NULL;
  237.     struct pollfd    fds;
  238.     int bytes;
  239.     int rc;
  240.  
  241.     fds.fd     = wrk_ctx -> in_queue;
  242.     fds.events = POLLIN;
  243.     for(;;)
  244.     {
  245.         fds.revents = 0;
  246.         rc = poll(&fds, 1, 1000);
  247.         if (rc < 0)
  248.         {
  249.             fprintf(stderr, "%p Error: %d", (void*)pthread_self(), errno);
  250.             return NULL;
  251.         }
  252.  
  253.         if (rc == 0) { /* fprintf(stderr, "Timeout\n"); */ }
  254.         else
  255.         {
  256.             bytes = recv(fds.fd, &io_ctx, sizeof(IO_CTX *), 0);
  257.             fprintf(stderr, "%p worker_thr/recv: %d\n", (void*)pthread_self(), bytes);
  258.             /* Impossible happened? */
  259.             if (bytes != sizeof(IO_CTX *)) { return NULL; }
  260.             fprintf(stderr, "From queue %p\n", (void*)io_ctx);
  261.  
  262.             /* Do a big slow work here //////// */
  263.             do
  264.             {
  265.                 int pos = 0;
  266.                 fprintf(stderr, "%p Handler: %d `%.*s`\n", (void*)pthread_self(), io_ctx -> buf_len, io_ctx -> buf_len, io_ctx -> buf);
  267.                 for (pos = 0; pos < io_ctx -> buf_len; pos += 2)
  268.                 {
  269.                     if (io_ctx -> buf[pos] > ' ') { io_ctx -> buf[pos] = '_'; }
  270.                 }
  271.                 sleep(1);
  272.             }
  273.             while(0);
  274.             /* /////////////////////////////// */
  275.  
  276.             /* Send notification to the network thread */
  277.             ev_async_send(io_ctx -> loop, &(io_ctx -> async));
  278.         }
  279.     }
  280.  
  281. return NULL;
  282. }
  283.  
  284. static int make_sock()
  285. {
  286.     struct sockaddr_in sock_addr;
  287.     int tmp;
  288.     int rc;
  289.  
  290.     int sock = socket(AF_INET, SOCK_STREAM, 0);
  291.     if (sock == -1) { fprintf(stderr, "socket\n"); return EX_SOFTWARE; }
  292.  
  293.     sock_addr.sin_family      = AF_INET;
  294.     sock_addr.sin_port        = htons(7788);
  295.     sock_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  296.  
  297.     tmp = 1;
  298.     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(int));
  299.     if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEADDR\n"); return -1; }
  300.  
  301.     tmp = 1;
  302.     rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(int));
  303.     if (rc == -1) { close(sock); fprintf(stderr, "setsockopt/SO_REUSEPORT\n"); return -1; }
  304.  
  305.     rc = fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | FD_CLOEXEC | O_NONBLOCK);
  306.     if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
  307.  
  308.     rc = bind(sock, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_in));
  309.     if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
  310.  
  311.     rc = listen(sock, 128);
  312.     if (rc == -1) { close(sock); fprintf(stderr, "fcntl/FD_CLOEXEC | O_NONBLOCK\n"); return -1; }
  313.  
  314. return sock;
  315. }
  316.  
  317. #ifndef C_NETWORK_THR
  318. #define C_NETWORK_THR 2
  319. #endif
  320.  
  321. #ifndef C_WORKER_THR
  322. #define C_WORKER_THR 5
  323. #endif
  324.  
  325. int main(int argc, char **argv)
  326. {
  327.     int sockpair[2];
  328.     int thr;
  329.     WRK_CTX wrk_ctx;
  330.     NET_CTX net_ctx;
  331.     pthread_t worker_thr[C_WORKER_THR];
  332.     pthread_t network_thr[C_NETWORK_THR];
  333.  
  334.     if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockpair) == -1) { fprintf(stderr, "socketpair\n"); return EX_SOFTWARE; }
  335.  
  336.     wrk_ctx.out_queue = sockpair[0];
  337.     wrk_ctx.in_queue  = sockpair[1];
  338.  
  339.     net_ctx.wrk_ctx = &wrk_ctx;
  340.     net_ctx.sock    = make_sock();
  341.     if (net_ctx.sock == -1) { return EX_SOFTWARE; }
  342.  
  343.     /* Worker threads */
  344.     for(thr = 0; thr < C_WORKER_THR; ++thr) { pthread_create(&worker_thr[thr], NULL, worker_thr_func, &wrk_ctx); }
  345.  
  346.     /* Network threads */
  347.     for(thr= 0; thr < C_NETWORK_THR; ++thr) { pthread_create(&network_thr[thr], NULL, network_thr_func, &net_ctx); }
  348.  
  349.     for(thr = 0; thr < C_NETWORK_THR; ++thr) { pthread_join(network_thr[thr], NULL); }
  350.     for(thr = 0; thr < C_WORKER_THR; ++thr)  { pthread_join(worker_thr[thr], NULL); }
  351.  
  352. return 0;
  353. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement