Advertisement
avp210159

conn.c -- Some client-server technics testing

May 28th, 2015
801
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 29.16 KB | None | 0 0
  1. // avp 2012 connect/accept/ process connection measure tests
  2. // as first step: gcc this-file.c -pthreads ; ./a.out -h
  3.  
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <sys/epoll.h>
  8. #include <sys/select.h>
  9. #include <unistd.h>
  10. #include <signal.h>
  11. #include <errno.h>
  12. #include <sys/wait.h>
  13. #include <sys/socket.h>
  14. #include <netinet/in.h>
  15. #include <netdb.h>
  16. #include <arpa/inet.h>
  17. #include <sys/ioctl.h>
  18. #include <sys/time.h>
  19. #include <sys/resource.h>
  20. #include <pthread.h>
  21. #include <semaphore.h>
  22. #include <sys/ipc.h>
  23. #include <sys/shm.h>
  24.  
  25. #include <sys/syscall.h>
  26.  
  27. static pid_t
  28. gettid()
  29. {
  30.   return syscall(SYS_gettid);
  31. }
  32.  
  33.  
  34. static long long
  35. mtime()
  36. {
  37.   struct timeval t;
  38.  
  39.   gettimeofday(&t, NULL);
  40.   long long mt = (long long)t.tv_sec * 1000 + t.tv_usec / 1000;
  41.   return mt;
  42. }
  43.  
  44.  
  45. #define fatal(msg) ({fprintf(stderr,"FATAL %ld: %s [%m]\n",(long)getpid(),msg);\
  46.       if (getpid() == masterpid) kill_clients(); exit(-1);})
  47. #define thfatal(msg) ({fprintf(stderr,"THREAD FATAL %ld: %s [%m]\n",(long)gettid(),msg);\
  48.       pthread_exit(NULL);})
  49.  
  50. #define max(x,y) ((x) > (y)? (x):(y))
  51.  
  52. #define USE_MASTER   1
  53. #define USE_FORK     2
  54. #define USE_THREAD   3
  55. #define USE_THPOOL   4
  56. #define USE_ASY      5
  57.  
  58. char *mthdtitle[] = {"MASTER","FORK","THREAD","THPOOL","ASYNC"};
  59.  
  60. #define CONNECT_ERRLIM 100
  61.  
  62. pid_t masterpid, loadpid = 0;
  63.  
  64. struct common {   // shared memory
  65.   int  exif;
  66.   sem_t sem_load, sem_cli;
  67. } *com = NULL;
  68. int segid = 0;    // shared memory id
  69.  
  70.  
  71. int nconn = 0,    // число принятых соединений
  72.   servconn = 0,   // число обработанных (все данные приняты отосланы)
  73. //  maxl = 0,       // MAX полученных одновременно subtotals (-p method)
  74.   nthreads = 0,   // количество запущенных  потоков
  75.   maxthreads = 0, // MAX потоков одновременно process data
  76.   nwaitedths = 0, // (для пула -T) количество потоков, ожидающих connect
  77.   poolsize = 0,   // 0 - unlimited
  78.   exif = 0,       // флаг alarm для exit
  79.   pdata[2],       // pipe for master <- process (-p) для получения subtotals
  80.   verbose = 1,
  81.   alarmt = 10,
  82.   load = -1,      // нет нагрузки, после обр. аргументов такой load станет 0
  83.   lline = -1,
  84.   port = 12345,
  85.   ncli = 4,       // число  client потоков
  86.   use_data = 1,   // после слединения обмен данными
  87.   use_epoll = 0,
  88.   use_select = 0,
  89.   cli_fork = 1,   // число процессов, запускающих client потоки
  90.   maxnfds = 0,
  91.   maxsocknum = 0,
  92.   method = USE_FORK; // способ обработки данных
  93.  
  94. long total = 0; // всего получено байт от клиентов
  95.  
  96. // ==== Потоки и очередь соединений для обработки в пуле потоков ====
  97. // локи для
  98. pthread_mutex_t statlock = PTHREAD_MUTEX_INITIALIZER, // total, maxthreads etc
  99.   thpoolock = PTHREAD_MUTEX_INITIALIZER; // пула потоков и очереди соединений
  100. sem_t           sem; // семафор очереди соединений
  101. pthread_attr_t attr;
  102.  
  103. // элемент очереди соединений
  104. struct qelem {
  105.   struct qelem *next; // для списка свободных
  106.   int    fd;
  107. };
  108.  
  109. // очередь соединений
  110. struct qelem *qhead = NULL, *qtail = NULL,
  111.   *qfree = NULL; // список свободных элементов очереди соединений
  112. // получить элемент очереди соединений
  113. struct qelem *
  114. get_qelem()
  115. {
  116.   struct qelem *e = qfree;
  117.  
  118.   if (e) {
  119.     qfree = e->next;
  120.     return e;
  121.   }
  122.   return (struct qelem *)malloc(sizeof(*qfree));
  123. }
  124. // вернуть в список свободных
  125. void
  126. free_qelem (struct qelem *e)
  127. {
  128.   e->next = qfree;
  129.   qfree = e;
  130. }
  131. // поместить соединение в очередь
  132. void
  133. to_queue (struct qelem *e)
  134. {
  135.   e->next = NULL;
  136.   if (qtail)
  137.     qtail = qtail->next = e;
  138.   else
  139.     qhead = qtail = e;
  140. }
  141. // взять соединение из очереди
  142. struct qelem *
  143. from_queue ()
  144. {
  145.   struct qelem *e = qhead;
  146.  
  147.   if (e) {
  148.     if (qhead == qtail)
  149.       qhead = qtail = NULL;
  150.     else
  151.       qhead = e->next;
  152.   }
  153.  
  154.   return e;
  155. }
  156.  
  157. // ==== Обрабатываемые в epoll соединения ====
  158. struct cli {
  159.   struct cli *next; // для списка свободных
  160.   int fd, tot;
  161.   char buf[1000];
  162. };
  163. // список свободных структур (минимизируем malloc)
  164. struct cli *clilist = NULL;
  165. // возвращает в список свободных
  166. void
  167. free_cli(struct cli *e)
  168. {
  169.   e->fd = -2;
  170.   e->next = clilist;
  171.   clilist = e;
  172. }
  173. // получить элемент для epoll
  174. struct cli *
  175. get_cli()
  176. {
  177.   struct cli *e = clilist;
  178.  
  179.   if (e) {
  180.     clilist = e->next;
  181.     return e;
  182.   }
  183.   return (struct cli *)malloc(sizeof(*e));
  184. }
  185.  
  186.  
  187. // измерение main в msec
  188. extern long long mtime();
  189. long long start=0, finish;
  190.  
  191. void
  192. Usage()
  193. {
  194.   puts("Usage: conns [-h] [-c|m|p|t|T|y] [-e|s] [-C -l -v -aTimeSec] [[host:]port] [Nclients]");
  195.   puts("-h    help");  
  196.   puts("-v[N] verbose 0 1(default) 2 3 4");  
  197.   puts("-C[N] use N forks for clients (default 1)");  
  198.   puts("-e    use epoll");  
  199.   puts("-s    use select");  
  200.   puts("-a[N] run test N sec (default 10)");  
  201.   puts("-T[N] process data using threads pool (N poolsize default unlimeted)");
  202.   puts("-t    process data using threads (new for each connection)");
  203.   puts("-p    process data using processes (new for each connection(default)");
  204.   puts("-m    process data in main synchronously");
  205.   puts("-y    process data in main asynchronously (epoll switch on)");
  206.   puts("-c    connections only (don't send data, ignore tTpmy flags)");
  207.   puts("-l[N] use low rate load N connections (default 1000-Nclients)");  
  208.  
  209.   exit(1);
  210. }
  211.  
  212. pid_t *cli = NULL; // массив процессов-клиентов
  213. void
  214. kill_clients()
  215. {
  216.   int j;
  217.   if (cli)
  218.     for (j = 0; j < cli_fork; j++)
  219.       kill(cli[j],SIGHUP);
  220. }
  221.  
  222.  
  223. void
  224. finfun ()
  225. {
  226.   if (getpid() == masterpid) {
  227.     if (com) {
  228.       sem_destroy(&com->sem_load);
  229.       sem_destroy(&com->sem_cli);
  230.     }
  231.     shmctl(segid,IPC_RMID,NULL);
  232.   }
  233. }
  234.  
  235.  
  236.  
  237.  
  238. // alarm handlers
  239. void
  240. alah(int s)
  241. {
  242.   char buf[1000];
  243.  
  244.   if (exif)
  245.     exit(0);
  246.   exif = 1;
  247.   if (com)
  248.     com->exif = 1;
  249.  
  250.   if (getpid() == masterpid)
  251.     kill_clients();
  252.  
  253.   finish = mtime();
  254.   write (1,buf,
  255.      sprintf (buf,"ALARM nconn = %d bytes = %ld sevrconn = %d time=%lld\n",
  256.           nconn,total,servconn,finish-start)
  257.      );
  258.  
  259.   if (method == USE_THREAD)
  260.     write (1,buf,
  261.        sprintf (buf,"nthreads = %d maxthreads = %d\n",
  262.             nthreads,maxthreads));
  263.   else if (method == USE_THPOOL)
  264.     write (1,buf,
  265.        sprintf (buf,"nthreads = %d maxthreads = %d   nwaitedths = %d\n",
  266.             nthreads,maxthreads,nwaitedths));
  267.   write (1,buf,
  268.      sprintf (buf,"maxnfds = %d maxsocknum = %d\n",
  269.           maxnfds,maxsocknum)
  270.      );
  271.        
  272.  
  273.   alarm(2);
  274. }
  275.  
  276.  
  277. // в клиенте повтор connect()
  278. int connagain = 0;
  279.  
  280. // handler for clients
  281. void
  282. hangh(int s)
  283. {
  284.   char buf[1000];
  285.  
  286.   write (1, buf,
  287.      sprintf (buf,"SIGHUP %s %ld connagain = %d nconn = %d servconn = %d\n",
  288.           loadpid? "Loader":"",
  289.           (long)getpid(),connagain,nconn,servconn));
  290.   if (loadpid)
  291.     write (1, buf,
  292.        sprintf (buf,"Loader %d connections %d lines\n",
  293.             load, lline)
  294.        );
  295.  
  296.   exit(0);
  297. }
  298.  
  299. // if returns 1 then continue to restart syscall, 0 - OK goto work
  300. int
  301. checkInterruptExit(int rc, char *msg)
  302. {
  303.   if (com->exif || (rc == -1 && errno != EINTR)) {
  304.     int nt = 0, status;
  305.  
  306.     if (!com->exif)
  307.       perror(msg);
  308.     if (masterpid == gettid()) {
  309.       printf ("main exit exif = %d\n",com->exif);
  310.  
  311.       kill_clients();
  312.       while(wait(&status) > 0)
  313.     nt++;
  314.  
  315.       printf ("End [%s,%s] %d connections (%ld bytes %d servconn)\nwait = %d time = %lld msec\n",
  316.           use_select? "Select": use_epoll? "Epoll":"Synch",
  317.           use_data? mthdtitle[method-1]:"CONNECTIONS",
  318.           nconn,total,servconn,nt,mtime()-start);
  319.       exit (0);
  320.     }
  321.   }
  322.   return errno == EINTR? 1: 0;
  323. }
  324.  
  325.  
  326.  
  327. void
  328. priconn (int sock, int loop, char *msg)
  329. {
  330.   if (verbose) {
  331.     int n = 3000;
  332.     if (verbose > 3)
  333.       n = 1;
  334.     else if (verbose > 2)
  335.       n = 1000;
  336.     if (loop % n == 0)
  337.       printf ("%s: %d loop = %d\n",msg,sock,loop);
  338.   }
  339. }
  340.  
  341. // returns socket
  342. int
  343. make_connect (struct sockaddr_in *addr)
  344. {
  345.   int sock = socket(AF_INET,SOCK_STREAM,0),
  346.     alen = sizeof(*addr), on = 1;
  347.   struct linger ling;
  348.   ling.l_onoff  = 1;
  349.   ling.l_linger = 0;
  350.  
  351.   if (sock != -1 &&
  352.       setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,
  353.          (char *)&on, sizeof (on)) != -1 &&
  354.       setsockopt(sock,SOL_SOCKET,SO_LINGER,
  355.          (char *)&ling,sizeof(ling)) != -1 &&
  356.       connect(sock,(struct sockaddr *)addr,alen) == 0)
  357.     return sock; // OK
  358.   // ERR
  359.   return sock == -1? -1: close(sock),-1;
  360. }
  361.  
  362.  
  363. // returns socket
  364. int
  365. make_listen (struct sockaddr_in *addr)
  366. {
  367.   int asock = socket(AF_INET,SOCK_STREAM,0), on = 1;
  368.  
  369.   if (asock != -1 &&
  370.       setsockopt(asock,SOL_SOCKET,SO_REUSEADDR,
  371.          (char *)&on, sizeof (on)) != -1 &&
  372.       bind(asock,(struct sockaddr *)addr,sizeof(*addr)) == 0 &&
  373.       listen(asock,1024) == 0)
  374.     return asock;
  375.  
  376.   return asock == -1? :close(asock),-1;
  377. }
  378.  
  379. // returns 1 OK, 0 ERR
  380. int
  381. make_ipaddr (char *host, int port, struct sockaddr_in *a)
  382. {
  383.   struct hostent     *phe;
  384.  
  385.   a->sin_family = AF_INET;
  386.   a->sin_port = htons(port);
  387.   a->sin_addr.s_addr = INADDR_ANY;  
  388.  
  389.   if (host && host[0]) {
  390.     if (phe = gethostbyname(host))
  391.       memcpy (&a->sin_addr, phe->h_addr, phe->h_length);
  392.     else
  393.       if ((a->sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
  394.     return 0;
  395.   }
  396.  
  397.   return 1;
  398. }
  399.  
  400. // обмен данными со стороны клиента
  401. int
  402. wecho (int sock, int loop)
  403. {
  404.   char buf[1000];
  405.   int  tot = 0, j, l, m = rand()%7+10;
  406.  
  407.   for (j = 0; j < m; j++)
  408.     write (sock,buf,sprintf(buf,"proc %ld sock %d loop %d line %d\n",
  409.                 (long)gettid(),sock,loop,j));
  410.   buf[0] = '\n';
  411.   if (write (sock,buf,1) != 1)
  412.     return -1;
  413.  
  414.   buf[0] = 0;
  415.   while ((l = read(sock,buf+1,sizeof(buf)-1)) > 0) {
  416.     tot += l;
  417.     if (buf[l] == '\n' && buf[l-1] == '\n')
  418.       break;
  419.     buf[0] = buf[l];
  420.   }
  421.  
  422.   return l > 0? tot: -1;
  423. }
  424.  
  425. // поток данных клиента
  426. void *
  427. thcli (void *a)
  428. {
  429.   struct sockaddr_in *paddr = (struct sockaddr_in *)a;
  430.   int sock, loop, tot = 0;
  431.  
  432.   for (loop = 0;; usleep(1),loop++) {
  433.     if ((sock = make_connect(paddr)) == -1)
  434.       thfatal("make connect");
  435.  
  436.     pthread_mutex_lock(&statlock);
  437.     nconn++;
  438.     pthread_mutex_unlock(&statlock);
  439.  
  440.     priconn (sock,loop,"Connect");
  441.  
  442.     if (use_data)
  443.       if ((tot = wecho(sock,loop)) < 1)
  444.     thfatal("wecho");
  445.  
  446.     close(sock);
  447.     pthread_mutex_lock(&statlock);
  448.     servconn++;
  449.     total += tot;
  450.     pthread_mutex_unlock(&statlock);
  451.   }
  452.  
  453.   printf ("Never %ld thread_exit i=%d\n",(long)gettid(),loop);
  454. }
  455.  
  456. // запуск потоков данных клиента
  457. pid_t
  458. runtcli (int n, char *host, int port)
  459. {
  460.   pid_t p = fork();
  461.   if (p)
  462.     return p;
  463.  
  464.   struct sockaddr_in sadr;
  465.   int i;
  466.  
  467.   if (!make_ipaddr(host,port,&sadr))
  468.     fatal("Cli make addr");
  469.  
  470.   signal(SIGHUP,hangh); // exit there
  471.   siginterrupt(SIGHUP,1);
  472.  
  473.   sem_wait(&com->sem_cli);
  474.   for (i = 0; i < n-1; i++) {
  475.     pthread_t th;
  476.          
  477.     if (pthread_create (&th, &attr, thcli, (void *)&sadr))
  478.       fatal("Cli create thread");
  479.   }
  480.  
  481.   printf ("client %ld run %d threads\n",(long)getpid(),n);
  482.   thcli((void *)&sadr);
  483.   // Not reached
  484. }
  485.  
  486. pid_t
  487. loader (char *host, int port)
  488. {
  489.   pid_t p = fork();
  490.   if (p)
  491.     return p;
  492.   loadpid = getpid();
  493.  
  494.   struct sockaddr_in sadr;
  495.   if (!make_ipaddr(host,port,&sadr))
  496.     fatal("Loader make addr");
  497.  
  498.   struct hostent     *phe;
  499.   int  sock[load], i, l;
  500.   char buf[1000];
  501.  
  502.   signal(SIGHUP,hangh);
  503.   siginterrupt(SIGHUP,1);
  504.  
  505.   sem_wait(&com->sem_load);
  506.   printf ("Loader start\n");
  507.   for (i = 0; i < load; i++) {
  508.     if ((sock[i] = make_connect(&sadr)) == -1)
  509.       fatal ("load connect");
  510.     nconn++;
  511.     if (write (sock[i],buf,
  512.            l = sprintf(buf,"Loader start %d\n",i)) != l)
  513.       fatal(buf);
  514.   }
  515.  
  516.   printf ("Loader make %d connections\n",i);
  517.   for (i = 0; i < cli_fork; i++)
  518.     sem_post(&com->sem_cli);
  519.  
  520.   for (lline = 0;;lline++, usleep(100000)) { // 10 раз в секунду
  521.     for (i = 0; i < load; i++) {
  522.       if (read(sock[i],buf,sizeof(buf)) <= 0)
  523.     fatal("load read");
  524.       if (write (sock[i],buf,
  525.          l = sprintf(buf,"Loader socket %d line %d\n",i,lline)) != l)
  526.     fatal(buf);
  527.     }
  528.   }
  529.   // Not reached
  530. }
  531.  
  532. // server echo lines (end \n\n)
  533. static int
  534. echo (int sock)
  535. {
  536.   char buf[1000];
  537.   int l, nfragments = 0, tot = 0;
  538.  
  539.   buf[0] = 0;
  540.   do {
  541.     while ((l = read(sock,buf+1,sizeof(buf)-1)) > 0) {
  542.       tot += l;
  543.       if (write(sock,buf+1,l) != l) {
  544.     l = -1;
  545.     break;
  546.       }
  547.       if (buf[l] == '\n' && buf[l-1] == '\n')
  548.     break;
  549.       buf[0] = buf[l];
  550.       nfragments++;
  551.     }
  552.   } while (checkInterruptExit(l,"echo"));
  553.  
  554.   if (nfragments && verbose > 2)
  555.     printf ("fragments = %d l=%d tot = %d\n",nfragments,l,tot);
  556.   return tot;
  557. }
  558.  
  559.  
  560. void *
  561. thecho (void *a)
  562. {
  563.   pthread_mutex_lock(&statlock);
  564.   nthreads++;
  565.   if (nthreads > maxthreads)
  566.     maxthreads = nthreads;
  567.   pthread_mutex_unlock(&statlock);
  568.  
  569.   int sock = (long)a,
  570.     tot = echo(sock);
  571.   close(sock);
  572.  
  573.   pthread_mutex_lock(&statlock);
  574.   nthreads--;
  575.   total += tot;
  576.   servconn++;
  577.   pthread_mutex_unlock(&statlock);
  578. }
  579.  
  580. void *
  581. tpecho (void *a)
  582. {
  583.   struct qelem *e = NULL;
  584.   int tot, nworks;
  585.  
  586.   for (;;) {
  587.     pthread_mutex_lock(&thpoolock);
  588.     nwaitedths++;
  589.     if (e)
  590.       free_qelem(e);
  591.     pthread_mutex_unlock(&thpoolock);
  592.    
  593.     sem_wait(&sem);
  594.  
  595.     pthread_mutex_lock(&thpoolock);
  596.     nwaitedths--;
  597.     if (!(e = from_queue()))
  598.       fatal("empty queue");
  599.     nworks =  nthreads-nwaitedths;
  600.     pthread_mutex_unlock(&thpoolock);
  601.  
  602.     tot = echo(e->fd);
  603.     close(e->fd);
  604.  
  605.     pthread_mutex_lock(&statlock);
  606.     total += tot;
  607.     servconn++;
  608.     if (nworks > maxthreads)
  609.       maxthreads = nworks;
  610.     //  printf ("tpecho\n");
  611.     pthread_mutex_unlock(&statlock);
  612.   }
  613. }
  614.  
  615.  
  616.  
  617. char *
  618. eva_names (int ev, char *buf)
  619. {
  620.   static int eflag[] = {EPOLLIN,EPOLLOUT,EPOLLRDHUP,EPOLLPRI,
  621.             EPOLLERR,EPOLLHUP,EPOLLET,EPOLLONESHOT};
  622.   static char *ename[] = {"EPOLLIN","EPOLLOUT","EPOLLRDHUP","EPOLLPRI",
  623.               "EPOLLERR","EPOLLHUP","EPOLLET","EPOLLONESHOT"};
  624.   int i, n;
  625.  
  626.   *buf = 0;
  627.   for (i = n = 0; i < sizeof(eflag)/sizeof(eflag[0]); i++)
  628.     if (ev & eflag[i])
  629.       n += sprintf (buf+n,"%s ",ename[i]);
  630.  
  631.   if (n)
  632.     buf[n-1] = 0;
  633.   return buf;
  634. }
  635.  
  636.  
  637. int
  638. main (int ac, char *av[])
  639. {
  640.   int   i, j;
  641.  
  642.   while (av[1] && av[1][0] == '-') {
  643.     switch (av[1][1]) {
  644.     case 'h':
  645.       Usage();
  646.     case 'c':
  647.       use_data = 0; // connections only
  648.       break;
  649.     case 'C':       // use many forks for client threads
  650.       if (!av[1][2])
  651.     break;
  652.       if ((cli_fork = atoi(av[1]+2)) > 0)
  653.     break;
  654.       printf ("Bad cli_fork value [%s]\n",av[1]+2);
  655.       Usage();
  656.     case 'm':
  657.       method = USE_MASTER; // method connection synchronously in main
  658.       break;
  659.     case 'p':
  660.       method = USE_FORK;
  661.       break;
  662.     case 't':
  663.       method = USE_THREAD;
  664.       break;
  665.     case 'T':
  666.       method = USE_THPOOL;
  667.       if ((poolsize = atoi(av[1]+2)) >=  0)
  668.     break;
  669.       printf ("Bad poolsize value [%s]\n",av[1]+2);
  670.       Usage();
  671.     case 'y':
  672.       method = USE_ASY;
  673.       use_epoll = 1;
  674.       break;
  675.     case 'e':
  676.       use_epoll = 1;
  677.       break;
  678.     case 's':
  679.       use_select = 1;
  680.       break;
  681.     case 'v':
  682.       verbose = atoi(av[1]+2);
  683.       break;
  684.     case 'a':
  685.       if ((alarmt = atoi(av[1]+2)) >  0)
  686.     break;
  687.       printf ("Bad alarm value [%s]\n",av[1]+2);
  688.       Usage();
  689.     case 'l':
  690.       if ((load = atoi(av[1]+2)) >=  0)
  691.     break;
  692.       printf ("Bad lowrate-load value [%s]\n",av[1]+2);
  693.       Usage();
  694.     default:
  695.       printf ("Unknown:  [%s]\n",av[1]);
  696.       Usage();
  697.     }
  698.     av++;
  699.   }
  700.  
  701.  
  702.   if (verbose < 0) {
  703.     struct rlimit rlim;
  704.     getrlimit(RLIMIT_NOFILE,&rlim);
  705.     printf ("RLIMIT_NOFILE hard: %ld  soft(cur): %ld\n",
  706.         rlim.rlim_max, rlim.rlim_cur);
  707.     rlim.rlim_max += 1000; rlim.rlim_cur += 1000;
  708.     printf ("setrlimit = %d hard: %ld  soft(cur): %ld\n",
  709.         getrlimit(RLIMIT_NOFILE,&rlim), rlim.rlim_max, rlim.rlim_cur);
  710.     getrlimit(RLIMIT_NOFILE,&rlim);
  711.     printf ("new RLIMIT_NOFILE hard: %ld  soft(cur): %ld\n",
  712.         rlim.rlim_max, rlim.rlim_cur);
  713.    
  714.     long argmax = sysconf(_SC_ARG_MAX),
  715.       childmax = sysconf(_SC_CHILD_MAX),
  716.       clockticks = sysconf(_SC_CLK_TCK),
  717.       openmax = sysconf(_SC_OPEN_MAX),
  718.       pagesize = sysconf(_SC_PAGESIZE),
  719.       physpages = sysconf(_SC_PHYS_PAGES),
  720.       avphyspgs = sysconf(_SC_AVPHYS_PAGES),
  721.       proconline = sysconf(_SC_NPROCESSORS_ONLN);
  722.  
  723.     printf ("argmax = %ld\n\
  724. childmax = %ld\n\
  725. clockticks = %ld\n\
  726. openmax = %ld\n\
  727. pagesize = %ld\n\
  728. physpages = %ld\n\
  729. avphyspgs = %ld\n\
  730. proconline = %ld\n",
  731.         sysconf(_SC_ARG_MAX),sysconf(_SC_CHILD_MAX),
  732.         sysconf(_SC_CLK_TCK),sysconf(_SC_OPEN_MAX),
  733.         sysconf(_SC_PAGESIZE),sysconf(_SC_PHYS_PAGES),
  734.         sysconf(_SC_AVPHYS_PAGES),sysconf(_SC_NPROCESSORS_ONLN));
  735.  
  736.     exit(0);
  737.   }
  738.  
  739.   if (av[1] && av[2])
  740.     if ((ncli = atoi(av[2])) < 1)
  741.       ncli = 4;
  742.  
  743.   if (load >= 0) {
  744.     int err = 0;
  745.     if (!use_data) {
  746.       err = 1;
  747.       printf ("incompatible -l and connections only\n");
  748.     } else if (method == USE_MASTER) {
  749.       err = 1;
  750.       printf ("incompatible -l and -m\n");
  751.     }
  752.     if (err)
  753.       exit (1);
  754.   }
  755.  
  756.  
  757.   char host[256] = "",
  758.     buf[1000];
  759.  
  760.   port = 12345;
  761.   if (av[1]) {
  762.     char *p = strchr(av[1],':');
  763.     if (p) {
  764.       if ((port = atoi(p+1)) <= 0)
  765.     port = 12345;
  766.       strncat(host,av[1],p-av[1]);
  767.       printf ("host = [%s] port = %d\n",host,port);
  768.     } else if ((port = atoi(av[1])) <= 0)
  769.       port = 12345;
  770.   }
  771.  
  772.  
  773.   struct sockaddr_in myaddr,  inaddr;
  774.  
  775.   if (!make_ipaddr(host,port,&myaddr))
  776.     fatal("make ipaddr");
  777.   if (!*host)
  778.     strcpy(host,"127.0.0.1");
  779.  
  780.   strcpy(buf,"dataflow ");
  781.   printf ("Test %s accept connections (-v%d) %s\n%d sec with %d clients (%s:%d)\n",
  782.       use_select? "Select": use_epoll? "Epoll":"Synch", verbose,
  783.       use_data? strcat(strcat (buf,mthdtitle[method-1])," processing")
  784.       :"only connections",
  785.       alarmt,
  786.       ncli,
  787.       host, port);
  788.   if (load >= 0)
  789.     printf ("use %d additional low rate connections\n",
  790.         load = load? load: 1000-ncli);
  791.   else
  792.     load = 0;
  793.  
  794.   //  exit (0);
  795.  
  796.   pthread_attr_init(&attr);
  797.   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  798.   pthread_attr_setstacksize(&attr,1024*128);
  799.  
  800.   sem_init(&sem,0,0);
  801.  
  802.   masterpid = gettid();
  803.   segid = shmget(IPC_PRIVATE, 4096, IPC_CREAT|0777);
  804.   if (segid == -1)
  805.     fatal("shmget");
  806.   atexit(finfun);
  807.   com = (struct common *)shmat(segid,NULL,0);
  808.   if (com == (void *)-1)
  809.     fatal("shmat");
  810.   com->exif = 0;
  811.   sem_init (&com->sem_load,1,0);
  812.   sem_init (&com->sem_cli,1,0);
  813.  
  814.   pipe(pdata);
  815.  
  816.   int asock, status;
  817.   socklen_t alen = sizeof(myaddr);
  818.  
  819.   if ((asock  = make_listen(&myaddr)) < 0)
  820.     fatal("make listen");
  821.  
  822.   getsockname (asock,(struct sockaddr *)&inaddr,&alen);
  823.   printf ("sock addr: %s:%d\n",
  824.       inet_ntoa(inaddr.sin_addr),
  825.       ntohs(inaddr.sin_port));
  826.  
  827.  
  828.   signal(SIGINT,alah);
  829.   signal(SIGALRM,alah);
  830.   siginterrupt(SIGALRM,1);
  831.  
  832.   int rem = ncli % cli_fork, clients = ncli/cli_fork;
  833.   cli = malloc((cli_fork + load? 1:0) * sizeof(*cli));
  834.   for (i = 0; i < cli_fork; i++)
  835.     cli[i] = runtcli(clients + (rem? --rem,1:0), host,port);
  836.   if (load) {
  837.     cli[cli_fork++] = loader(host,port);
  838.     sem_post(&com->sem_load);
  839.   } else {
  840.     for (i = 0; i < cli_fork; i++)
  841.       sem_post(&com->sem_cli);
  842.   }
  843.  
  844.   alarm(alarmt);
  845.   start  = mtime();
  846.   int sock, loop, nfds;
  847.  
  848.   if (use_select) {
  849.     printf ("Select test\n");
  850.  
  851.     fd_set rfds, curfds;
  852.     struct cli *pcli, *acli[sizeof(rfds)*8];
  853.  
  854.     maxsocknum = max(asock,pdata[0]);
  855.  
  856.     FD_ZERO(&rfds);
  857.     FD_SET(asock,&rfds);
  858.     FD_SET(pdata[0],&rfds);
  859.  
  860.  
  861.     for (loop = 0;;loop++) {
  862.       memcpy(&curfds,&rfds,sizeof(rfds));
  863.       nfds = select(maxsocknum+1,&curfds,NULL,NULL,NULL);
  864.       maxnfds = max(maxnfds,nfds);
  865.       if (checkInterruptExit(nfds,"select"))
  866.     continue; // errno == EINTR
  867.       if (FD_ISSET(asock,&curfds)) {
  868.     // accept
  869.     nfds--; FD_CLR(asock,&curfds);
  870.     do {
  871.       sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
  872.     } while (checkInterruptExit(sock,"select accept")); // errno == EINTR
  873.     maxsocknum = max(maxsocknum,sock);
  874.     nconn++;
  875.     priconn(sock,loop,"Accept");
  876.     if (use_data) {
  877.       switch (method) {
  878.       case USE_FORK:
  879.         if (fork())
  880.           close(sock);
  881.         else { // child
  882.           close(pdata[0]);
  883.           int tot = echo(sock);
  884.           if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
  885.         fatal("write pipe");
  886.           exit(0);
  887.         }
  888.         break;
  889.       case USE_THREAD:
  890.         {
  891.           pthread_t th;
  892.          
  893.           if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
  894.         fatal("serv create thread");
  895.          
  896.         }
  897.         break;
  898.       case USE_THPOOL:
  899.         {
  900.           pthread_mutex_lock(&thpoolock);
  901.           if (!nwaitedths && (!poolsize || nthreads < poolsize)) {
  902.         pthread_t th;
  903.         if (pthread_create (&th, &attr, tpecho, NULL))
  904.           fatal("pool thread");
  905.         nthreads++;
  906.           }
  907.           struct qelem *e = get_qelem();
  908.           e->fd = sock;
  909.           to_queue(e);
  910.           sem_post(&sem);
  911.           pthread_mutex_unlock(&thpoolock);
  912.         }
  913.         break;
  914.  
  915.       default:
  916.         // store sock for future processing
  917.         pcli = acli[sock] = get_cli();
  918.         pcli->tot = 0;
  919.         pcli->buf[0] = 0;
  920.         maxsocknum = max(maxsocknum,sock);
  921.         FD_SET(sock,&rfds);
  922.       }
  923.     } else {
  924.       close(sock); // -c flag
  925.     }
  926.       }
  927.       if (use_data) {
  928.     if (FD_ISSET(pdata[0],&curfds)) {
  929.       nfds--; FD_CLR(pdata[0],&curfds);
  930.       // subtotals in pdata pipe for -p method (default)
  931.       int tot;
  932.       if (read(pdata[0],&tot,sizeof(tot)) != sizeof(tot))
  933.         fatal("serv unexpected pdata (select)");
  934.       waitpid(-1,&status,WNOHANG);
  935.       servconn++;
  936.       total += tot;
  937.     }
  938.     for (sock = 0; nfds && sock <= maxsocknum; sock++) {
  939.       if (FD_ISSET(sock,&curfds)) {
  940.         int l, lw;
  941.  
  942.         nfds--;
  943.         pcli = acli[sock];
  944.         switch (method) {
  945.         case USE_MASTER:
  946.           total += echo(sock);
  947.           close(sock);
  948.           FD_CLR(sock,&rfds);
  949.           free_cli(pcli);
  950.           servconn++;
  951.           break;
  952.         case USE_ASY:
  953.           do {
  954.         l = read(sock,pcli->buf+1,998);
  955.           } while (checkInterruptExit(l,"read USE_ASY"));
  956.           do {
  957.         lw = write(sock,pcli->buf+1,l);
  958.           } while ( checkInterruptExit(lw,"write USE_ASY"));
  959.           if (lw != l)
  960.         fatal ("write USE_ASY");
  961.  
  962.           total += l;
  963.           if (pcli->buf[l] == '\n' && pcli->buf[l-1] == '\n') { // end
  964.         servconn++;
  965.         close(sock);
  966.         FD_CLR(sock,&rfds);
  967.         free_cli(pcli);
  968.           } else
  969.         pcli->buf[0] = pcli->buf[l];
  970.           break;
  971.  
  972.         default:
  973.           printf ("Sorry method %s not implemented for epoll yet\n",
  974.               mthdtitle[method-1]);
  975.           alah (SIGALRM);
  976.           exit(0);
  977.         }
  978.       }
  979.     }
  980.       }
  981.     }
  982.   } else if (use_epoll) {
  983.     printf ("Epoll test\n");
  984.     int evp = epoll_create1(EPOLL_CLOEXEC);
  985.     struct epoll_event eva[1000], ev;
  986.     struct cli  asd, pdd, *pcli;
  987.  
  988.     asd.fd = asock;
  989.     pdd.fd = pdata[0];
  990.     ev.events = EPOLLIN;
  991.     ev.data.ptr = &asd;
  992.     if (epoll_ctl(evp,EPOLL_CTL_ADD,asock,&ev))
  993.       fatal("epoll_ctl_add asock");
  994.     ev.data.ptr = &pdd;
  995.     if (epoll_ctl(evp,EPOLL_CTL_ADD,pdata[0],&ev))
  996.       fatal("epoll_ctl_add pdata");
  997.  
  998.     for (loop = 0;;loop++) {
  999.       nfds = epoll_wait (evp,eva,1000,-1);
  1000.       maxnfds = max(maxnfds,nfds);
  1001.       if (checkInterruptExit(nfds,"epoll"))
  1002.     continue; // errno == EINTR
  1003.  
  1004.       for (j = 0; j < nfds; j++) {
  1005.     pcli = eva[j].data.ptr;
  1006.     if ((sock = pcli->fd) < 0)
  1007.       continue; // дубликат в eva уже закрытого сокета
  1008.     if (eva[j].events == EPOLLHUP) {
  1009.       if (pcli == &asd)
  1010.         fatal("asock closed");
  1011.       if (pcli == &pdd)
  1012.         fatal("pdata closed");
  1013.       close(sock);
  1014. #if 0
  1015.       // pcli->fd автоматически удаляется из пула при close(pcli->fd)
  1016.       if (epoll_ctl(evp,EPOLL_CTL_DEL,pcli->fd,&ev))
  1017.         fatal("epoll_ctl_add pdata");
  1018. #endif
  1019.       free_cli(pcli);
  1020.     } else if (eva[j].events & EPOLLIN) {
  1021.       if (pcli == &asd) {
  1022.         sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
  1023.         if (checkInterruptExit(sock,"epoll accept"))
  1024.           continue; // errno == EINTR
  1025.         maxsocknum = max(maxsocknum,sock);
  1026.         nconn++;
  1027.         priconn(sock,loop,"Accept");
  1028.         if (use_data) {
  1029.           switch (method) {
  1030.           case USE_FORK:
  1031.         if (fork())
  1032.           close(sock);
  1033.         else { // child
  1034.           close(pdata[0]);
  1035.           int tot = echo(sock);
  1036.           if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
  1037.             fatal("write pipe");
  1038.           exit(0);
  1039.         }
  1040.         break;
  1041.           case USE_THREAD:
  1042.         {
  1043.           pthread_t th;
  1044.  
  1045.           if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
  1046.             fatal("serv create thread");
  1047.  
  1048.         }
  1049.         break;
  1050.           case USE_THPOOL:
  1051.         {
  1052.           pthread_mutex_lock(&thpoolock);
  1053.           if (!nwaitedths  && (!poolsize || nthreads < poolsize)) {
  1054.             pthread_t th;
  1055.             if (pthread_create (&th, &attr, tpecho, NULL))
  1056.               fatal("pool thread");
  1057.             nthreads++;
  1058.           }
  1059.           struct qelem *e = get_qelem();
  1060.           e->fd = sock;
  1061.           to_queue(e);
  1062.           sem_post(&sem);
  1063.           pthread_mutex_unlock(&thpoolock);
  1064.         }
  1065.         break;
  1066.  
  1067.           default:
  1068.         // store sock for future processing
  1069.         pcli = get_cli();
  1070.         pcli->fd = sock;
  1071.         pcli->tot = 0;
  1072.         pcli->buf[0] = 0;
  1073.         ev.data.ptr = pcli;
  1074.         if (epoll_ctl(evp,EPOLL_CTL_ADD,sock,&ev))
  1075.           fatal("epoll_ctl_add pdata");
  1076.           }
  1077.         } else {
  1078.           close (sock);  // -c flag
  1079.         }
  1080.       } else if (pcli == &pdd) {
  1081.         // subtotals in pdata pipe for -p method (default)
  1082.         int tot;
  1083.         if (read(sock,&tot,sizeof(tot)) != sizeof(tot))
  1084.           fatal("serv unexpected pdata");
  1085.         waitpid(-1,&status,WNOHANG);
  1086.         servconn++;
  1087.         total += tot;
  1088.       } else {
  1089.         int l, lw;
  1090.         // sock data from client process them according to method
  1091.         if (!use_data)
  1092.           fatal("serv expected data");
  1093.         switch (method) {
  1094.         case USE_MASTER:
  1095.           total += echo(sock);
  1096.           close(sock);
  1097.           free_cli(pcli);
  1098.           servconn++;
  1099.           break;
  1100.         case USE_ASY:
  1101.           do {
  1102.         l = read(sock,pcli->buf+1,998);
  1103. #if 0
  1104.         printf ("read ASY: sock = %d l = %d nfds = %d j = %d loop = %d\n",
  1105.             sock,l,nfds,j,loop);
  1106. #endif
  1107.           } while (checkInterruptExit(l,"read USE_ASY"));
  1108.           do {
  1109.         lw = write(sock,pcli->buf+1,l);
  1110.           } while ( checkInterruptExit(lw,"write USE_ASY"));
  1111.           if (lw != l)
  1112.         fatal ("write USE_ASY");
  1113.  
  1114.           total += l;
  1115.           if (pcli->buf[l] == '\n' && pcli->buf[l-1] == '\n') { // end
  1116.         servconn++;
  1117.         close(sock);
  1118.         free_cli(pcli);
  1119.           } else
  1120.         pcli->buf[0] = pcli->buf[l];
  1121.           break;
  1122.  
  1123.         default:
  1124.           printf ("Sorry method %s not implemented for epoll yet\n",
  1125.               mthdtitle[method-1]);
  1126.           alah (SIGALRM);
  1127.           exit(0);
  1128.         }
  1129.       }
  1130.     } else {
  1131.       printf ("unexpected event: %d nfds = %d eva=%d fd=%d [%s]",
  1132.           loop,nfds,j,pcli->fd,eva_names(eva[j].events,buf));
  1133.       fatal("unexpected event");
  1134.     }
  1135.       }
  1136.     }
  1137.   } else {
  1138.     for (loop=0;;loop++) {
  1139.       do {
  1140.     sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
  1141.       //    getchar();
  1142.       } while (checkInterruptExit(sock,"accept")); // errno == EINTR
  1143.       maxsocknum = max(maxsocknum,sock);
  1144.       nconn++;
  1145.       priconn(sock,loop,"Accept");
  1146.  
  1147.       if (use_data) {
  1148.     if (method == USE_MASTER) {
  1149.       total += echo(sock);
  1150.       close(sock);
  1151.       servconn++;
  1152.     } else if (method == USE_FORK) {
  1153.       if (fork() == 0) {     
  1154.         close(pdata[0]);
  1155.         int tot = echo(sock);
  1156.         if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
  1157.           fatal("write pipe");
  1158.         exit(0);
  1159.       } else {  // parent and USE_FORK
  1160.         close(sock);
  1161.         waitpid(-1,&status,WNOHANG);
  1162.         int subtot[1000], l, j, nb;
  1163.         nb = 0;
  1164.         if (ioctl(pdata[0],FIONREAD,&nb))
  1165.           fatal("ioctl");
  1166.         if (nb) {
  1167.           l = read(pdata[0],subtot,sizeof(subtot));
  1168.           l /= sizeof(subtot[0]);
  1169. #if 0
  1170.           if (l > maxl)
  1171.         maxl = l;
  1172. #endif
  1173.           servconn += l;
  1174.           for (j = 0; j < l; j++) {
  1175.         total += subtot[j];
  1176.           }
  1177.         }
  1178.       }
  1179.     } else if (method == USE_THREAD) {
  1180.       pthread_t th;
  1181.  
  1182.       if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
  1183.         fatal("serv create thread");
  1184.  
  1185.     } else if (method == USE_THPOOL) {
  1186.       pthread_mutex_lock(&thpoolock);
  1187.       if (!nwaitedths && (!poolsize || nthreads < poolsize)) {
  1188.         pthread_t th;
  1189.         if (pthread_create (&th, &attr, tpecho, NULL))
  1190.           fatal("pool thread");
  1191.         nthreads++;
  1192.       }
  1193.       struct qelem *e = get_qelem();
  1194.       e->fd = sock;
  1195.       to_queue(e);
  1196.       sem_post(&sem);
  1197.       pthread_mutex_unlock(&thpoolock);
  1198.     } else {
  1199.       printf ("Sorry method %s not implemented yet\n",
  1200.           mthdtitle[method-1]);
  1201.       alah (SIGALRM);
  1202.       exit(0);
  1203.     }
  1204.       } else {
  1205.     close (sock);
  1206.       }
  1207.     }
  1208.   }
  1209.  
  1210.  
  1211. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement