Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // avp 2012 connect/accept/ process connection measure tests
- // as first step: gcc this-file.c -pthreads ; ./a.out -h
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/epoll.h>
- #include <sys/select.h>
- #include <unistd.h>
- #include <signal.h>
- #include <errno.h>
- #include <sys/wait.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <netdb.h>
- #include <arpa/inet.h>
- #include <sys/ioctl.h>
- #include <sys/time.h>
- #include <sys/resource.h>
- #include <pthread.h>
- #include <semaphore.h>
- #include <sys/ipc.h>
- #include <sys/shm.h>
- #include <sys/syscall.h>
- static pid_t
- gettid()
- {
- return syscall(SYS_gettid);
- }
- static long long
- mtime()
- {
- struct timeval t;
- gettimeofday(&t, NULL);
- long long mt = (long long)t.tv_sec * 1000 + t.tv_usec / 1000;
- return mt;
- }
- #define fatal(msg) ({fprintf(stderr,"FATAL %ld: %s [%m]\n",(long)getpid(),msg);\
- if (getpid() == masterpid) kill_clients(); exit(-1);})
- #define thfatal(msg) ({fprintf(stderr,"THREAD FATAL %ld: %s [%m]\n",(long)gettid(),msg);\
- pthread_exit(NULL);})
- #define max(x,y) ((x) > (y)? (x):(y))
- #define USE_MASTER 1
- #define USE_FORK 2
- #define USE_THREAD 3
- #define USE_THPOOL 4
- #define USE_ASY 5
- char *mthdtitle[] = {"MASTER","FORK","THREAD","THPOOL","ASYNC"};
- #define CONNECT_ERRLIM 100
- pid_t masterpid, loadpid = 0;
- struct common { // shared memory
- int exif;
- sem_t sem_load, sem_cli;
- } *com = NULL;
- int segid = 0; // shared memory id
- int nconn = 0, // число принятых соединений
- servconn = 0, // число обработанных (все данные приняты отосланы)
- // maxl = 0, // MAX полученных одновременно subtotals (-p method)
- nthreads = 0, // количество запущенных потоков
- maxthreads = 0, // MAX потоков одновременно process data
- nwaitedths = 0, // (для пула -T) количество потоков, ожидающих connect
- poolsize = 0, // 0 - unlimited
- exif = 0, // флаг alarm для exit
- pdata[2], // pipe for master <- process (-p) для получения subtotals
- verbose = 1,
- alarmt = 10,
- load = -1, // нет нагрузки, после обр. аргументов такой load станет 0
- lline = -1,
- port = 12345,
- ncli = 4, // число client потоков
- use_data = 1, // после слединения обмен данными
- use_epoll = 0,
- use_select = 0,
- cli_fork = 1, // число процессов, запускающих client потоки
- maxnfds = 0,
- maxsocknum = 0,
- method = USE_FORK; // способ обработки данных
- long total = 0; // всего получено байт от клиентов
- // ==== Потоки и очередь соединений для обработки в пуле потоков ====
- // локи для
- pthread_mutex_t statlock = PTHREAD_MUTEX_INITIALIZER, // total, maxthreads etc
- thpoolock = PTHREAD_MUTEX_INITIALIZER; // пула потоков и очереди соединений
- sem_t sem; // семафор очереди соединений
- pthread_attr_t attr;
- // элемент очереди соединений
- struct qelem {
- struct qelem *next; // для списка свободных
- int fd;
- };
- // очередь соединений
- struct qelem *qhead = NULL, *qtail = NULL,
- *qfree = NULL; // список свободных элементов очереди соединений
- // получить элемент очереди соединений
- struct qelem *
- get_qelem()
- {
- struct qelem *e = qfree;
- if (e) {
- qfree = e->next;
- return e;
- }
- return (struct qelem *)malloc(sizeof(*qfree));
- }
- // вернуть в список свободных
- void
- free_qelem (struct qelem *e)
- {
- e->next = qfree;
- qfree = e;
- }
- // поместить соединение в очередь
- void
- to_queue (struct qelem *e)
- {
- e->next = NULL;
- if (qtail)
- qtail = qtail->next = e;
- else
- qhead = qtail = e;
- }
- // взять соединение из очереди
- struct qelem *
- from_queue ()
- {
- struct qelem *e = qhead;
- if (e) {
- if (qhead == qtail)
- qhead = qtail = NULL;
- else
- qhead = e->next;
- }
- return e;
- }
- // ==== Обрабатываемые в epoll соединения ====
- struct cli {
- struct cli *next; // для списка свободных
- int fd, tot;
- char buf[1000];
- };
- // список свободных структур (минимизируем malloc)
- struct cli *clilist = NULL;
- // возвращает в список свободных
- void
- free_cli(struct cli *e)
- {
- e->fd = -2;
- e->next = clilist;
- clilist = e;
- }
- // получить элемент для epoll
- struct cli *
- get_cli()
- {
- struct cli *e = clilist;
- if (e) {
- clilist = e->next;
- return e;
- }
- return (struct cli *)malloc(sizeof(*e));
- }
- // измерение main в msec
- extern long long mtime();
- long long start=0, finish;
- void
- Usage()
- {
- puts("Usage: conns [-h] [-c|m|p|t|T|y] [-e|s] [-C -l -v -aTimeSec] [[host:]port] [Nclients]");
- puts("-h help");
- puts("-v[N] verbose 0 1(default) 2 3 4");
- puts("-C[N] use N forks for clients (default 1)");
- puts("-e use epoll");
- puts("-s use select");
- puts("-a[N] run test N sec (default 10)");
- puts("-T[N] process data using threads pool (N poolsize default unlimeted)");
- puts("-t process data using threads (new for each connection)");
- puts("-p process data using processes (new for each connection(default)");
- puts("-m process data in main synchronously");
- puts("-y process data in main asynchronously (epoll switch on)");
- puts("-c connections only (don't send data, ignore tTpmy flags)");
- puts("-l[N] use low rate load N connections (default 1000-Nclients)");
- exit(1);
- }
- pid_t *cli = NULL; // массив процессов-клиентов
- void
- kill_clients()
- {
- int j;
- if (cli)
- for (j = 0; j < cli_fork; j++)
- kill(cli[j],SIGHUP);
- }
- void
- finfun ()
- {
- if (getpid() == masterpid) {
- if (com) {
- sem_destroy(&com->sem_load);
- sem_destroy(&com->sem_cli);
- }
- shmctl(segid,IPC_RMID,NULL);
- }
- }
- // alarm handlers
- void
- alah(int s)
- {
- char buf[1000];
- if (exif)
- exit(0);
- exif = 1;
- if (com)
- com->exif = 1;
- if (getpid() == masterpid)
- kill_clients();
- finish = mtime();
- write (1,buf,
- sprintf (buf,"ALARM nconn = %d bytes = %ld sevrconn = %d time=%lld\n",
- nconn,total,servconn,finish-start)
- );
- if (method == USE_THREAD)
- write (1,buf,
- sprintf (buf,"nthreads = %d maxthreads = %d\n",
- nthreads,maxthreads));
- else if (method == USE_THPOOL)
- write (1,buf,
- sprintf (buf,"nthreads = %d maxthreads = %d nwaitedths = %d\n",
- nthreads,maxthreads,nwaitedths));
- write (1,buf,
- sprintf (buf,"maxnfds = %d maxsocknum = %d\n",
- maxnfds,maxsocknum)
- );
- alarm(2);
- }
- // в клиенте повтор connect()
- int connagain = 0;
- // handler for clients
- void
- hangh(int s)
- {
- char buf[1000];
- write (1, buf,
- sprintf (buf,"SIGHUP %s %ld connagain = %d nconn = %d servconn = %d\n",
- loadpid? "Loader":"",
- (long)getpid(),connagain,nconn,servconn));
- if (loadpid)
- write (1, buf,
- sprintf (buf,"Loader %d connections %d lines\n",
- load, lline)
- );
- exit(0);
- }
- // if returns 1 then continue to restart syscall, 0 - OK goto work
- int
- checkInterruptExit(int rc, char *msg)
- {
- if (com->exif || (rc == -1 && errno != EINTR)) {
- int nt = 0, status;
- if (!com->exif)
- perror(msg);
- if (masterpid == gettid()) {
- printf ("main exit exif = %d\n",com->exif);
- kill_clients();
- while(wait(&status) > 0)
- nt++;
- printf ("End [%s,%s] %d connections (%ld bytes %d servconn)\nwait = %d time = %lld msec\n",
- use_select? "Select": use_epoll? "Epoll":"Synch",
- use_data? mthdtitle[method-1]:"CONNECTIONS",
- nconn,total,servconn,nt,mtime()-start);
- exit (0);
- }
- }
- return errno == EINTR? 1: 0;
- }
- void
- priconn (int sock, int loop, char *msg)
- {
- if (verbose) {
- int n = 3000;
- if (verbose > 3)
- n = 1;
- else if (verbose > 2)
- n = 1000;
- if (loop % n == 0)
- printf ("%s: %d loop = %d\n",msg,sock,loop);
- }
- }
- // returns socket
- int
- make_connect (struct sockaddr_in *addr)
- {
- int sock = socket(AF_INET,SOCK_STREAM,0),
- alen = sizeof(*addr), on = 1;
- struct linger ling;
- ling.l_onoff = 1;
- ling.l_linger = 0;
- if (sock != -1 &&
- setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,
- (char *)&on, sizeof (on)) != -1 &&
- setsockopt(sock,SOL_SOCKET,SO_LINGER,
- (char *)&ling,sizeof(ling)) != -1 &&
- connect(sock,(struct sockaddr *)addr,alen) == 0)
- return sock; // OK
- // ERR
- return sock == -1? -1: close(sock),-1;
- }
- // returns socket
- int
- make_listen (struct sockaddr_in *addr)
- {
- int asock = socket(AF_INET,SOCK_STREAM,0), on = 1;
- if (asock != -1 &&
- setsockopt(asock,SOL_SOCKET,SO_REUSEADDR,
- (char *)&on, sizeof (on)) != -1 &&
- bind(asock,(struct sockaddr *)addr,sizeof(*addr)) == 0 &&
- listen(asock,1024) == 0)
- return asock;
- return asock == -1? :close(asock),-1;
- }
- // returns 1 OK, 0 ERR
- int
- make_ipaddr (char *host, int port, struct sockaddr_in *a)
- {
- struct hostent *phe;
- a->sin_family = AF_INET;
- a->sin_port = htons(port);
- a->sin_addr.s_addr = INADDR_ANY;
- if (host && host[0]) {
- if (phe = gethostbyname(host))
- memcpy (&a->sin_addr, phe->h_addr, phe->h_length);
- else
- if ((a->sin_addr.s_addr = inet_addr(host)) == INADDR_NONE)
- return 0;
- }
- return 1;
- }
- // обмен данными со стороны клиента
- int
- wecho (int sock, int loop)
- {
- char buf[1000];
- int tot = 0, j, l, m = rand()%7+10;
- for (j = 0; j < m; j++)
- write (sock,buf,sprintf(buf,"proc %ld sock %d loop %d line %d\n",
- (long)gettid(),sock,loop,j));
- buf[0] = '\n';
- if (write (sock,buf,1) != 1)
- return -1;
- buf[0] = 0;
- while ((l = read(sock,buf+1,sizeof(buf)-1)) > 0) {
- tot += l;
- if (buf[l] == '\n' && buf[l-1] == '\n')
- break;
- buf[0] = buf[l];
- }
- return l > 0? tot: -1;
- }
- // поток данных клиента
- void *
- thcli (void *a)
- {
- struct sockaddr_in *paddr = (struct sockaddr_in *)a;
- int sock, loop, tot = 0;
- for (loop = 0;; usleep(1),loop++) {
- if ((sock = make_connect(paddr)) == -1)
- thfatal("make connect");
- pthread_mutex_lock(&statlock);
- nconn++;
- pthread_mutex_unlock(&statlock);
- priconn (sock,loop,"Connect");
- if (use_data)
- if ((tot = wecho(sock,loop)) < 1)
- thfatal("wecho");
- close(sock);
- pthread_mutex_lock(&statlock);
- servconn++;
- total += tot;
- pthread_mutex_unlock(&statlock);
- }
- printf ("Never %ld thread_exit i=%d\n",(long)gettid(),loop);
- }
- // запуск потоков данных клиента
- pid_t
- runtcli (int n, char *host, int port)
- {
- pid_t p = fork();
- if (p)
- return p;
- struct sockaddr_in sadr;
- int i;
- if (!make_ipaddr(host,port,&sadr))
- fatal("Cli make addr");
- signal(SIGHUP,hangh); // exit there
- siginterrupt(SIGHUP,1);
- sem_wait(&com->sem_cli);
- for (i = 0; i < n-1; i++) {
- pthread_t th;
- if (pthread_create (&th, &attr, thcli, (void *)&sadr))
- fatal("Cli create thread");
- }
- printf ("client %ld run %d threads\n",(long)getpid(),n);
- thcli((void *)&sadr);
- // Not reached
- }
- pid_t
- loader (char *host, int port)
- {
- pid_t p = fork();
- if (p)
- return p;
- loadpid = getpid();
- struct sockaddr_in sadr;
- if (!make_ipaddr(host,port,&sadr))
- fatal("Loader make addr");
- struct hostent *phe;
- int sock[load], i, l;
- char buf[1000];
- signal(SIGHUP,hangh);
- siginterrupt(SIGHUP,1);
- sem_wait(&com->sem_load);
- printf ("Loader start\n");
- for (i = 0; i < load; i++) {
- if ((sock[i] = make_connect(&sadr)) == -1)
- fatal ("load connect");
- nconn++;
- if (write (sock[i],buf,
- l = sprintf(buf,"Loader start %d\n",i)) != l)
- fatal(buf);
- }
- printf ("Loader make %d connections\n",i);
- for (i = 0; i < cli_fork; i++)
- sem_post(&com->sem_cli);
- for (lline = 0;;lline++, usleep(100000)) { // 10 раз в секунду
- for (i = 0; i < load; i++) {
- if (read(sock[i],buf,sizeof(buf)) <= 0)
- fatal("load read");
- if (write (sock[i],buf,
- l = sprintf(buf,"Loader socket %d line %d\n",i,lline)) != l)
- fatal(buf);
- }
- }
- // Not reached
- }
- // server echo lines (end \n\n)
- static int
- echo (int sock)
- {
- char buf[1000];
- int l, nfragments = 0, tot = 0;
- buf[0] = 0;
- do {
- while ((l = read(sock,buf+1,sizeof(buf)-1)) > 0) {
- tot += l;
- if (write(sock,buf+1,l) != l) {
- l = -1;
- break;
- }
- if (buf[l] == '\n' && buf[l-1] == '\n')
- break;
- buf[0] = buf[l];
- nfragments++;
- }
- } while (checkInterruptExit(l,"echo"));
- if (nfragments && verbose > 2)
- printf ("fragments = %d l=%d tot = %d\n",nfragments,l,tot);
- return tot;
- }
- void *
- thecho (void *a)
- {
- pthread_mutex_lock(&statlock);
- nthreads++;
- if (nthreads > maxthreads)
- maxthreads = nthreads;
- pthread_mutex_unlock(&statlock);
- int sock = (long)a,
- tot = echo(sock);
- close(sock);
- pthread_mutex_lock(&statlock);
- nthreads--;
- total += tot;
- servconn++;
- pthread_mutex_unlock(&statlock);
- }
- void *
- tpecho (void *a)
- {
- struct qelem *e = NULL;
- int tot, nworks;
- for (;;) {
- pthread_mutex_lock(&thpoolock);
- nwaitedths++;
- if (e)
- free_qelem(e);
- pthread_mutex_unlock(&thpoolock);
- sem_wait(&sem);
- pthread_mutex_lock(&thpoolock);
- nwaitedths--;
- if (!(e = from_queue()))
- fatal("empty queue");
- nworks = nthreads-nwaitedths;
- pthread_mutex_unlock(&thpoolock);
- tot = echo(e->fd);
- close(e->fd);
- pthread_mutex_lock(&statlock);
- total += tot;
- servconn++;
- if (nworks > maxthreads)
- maxthreads = nworks;
- // printf ("tpecho\n");
- pthread_mutex_unlock(&statlock);
- }
- }
- char *
- eva_names (int ev, char *buf)
- {
- static int eflag[] = {EPOLLIN,EPOLLOUT,EPOLLRDHUP,EPOLLPRI,
- EPOLLERR,EPOLLHUP,EPOLLET,EPOLLONESHOT};
- static char *ename[] = {"EPOLLIN","EPOLLOUT","EPOLLRDHUP","EPOLLPRI",
- "EPOLLERR","EPOLLHUP","EPOLLET","EPOLLONESHOT"};
- int i, n;
- *buf = 0;
- for (i = n = 0; i < sizeof(eflag)/sizeof(eflag[0]); i++)
- if (ev & eflag[i])
- n += sprintf (buf+n,"%s ",ename[i]);
- if (n)
- buf[n-1] = 0;
- return buf;
- }
- int
- main (int ac, char *av[])
- {
- int i, j;
- while (av[1] && av[1][0] == '-') {
- switch (av[1][1]) {
- case 'h':
- Usage();
- case 'c':
- use_data = 0; // connections only
- break;
- case 'C': // use many forks for client threads
- if (!av[1][2])
- break;
- if ((cli_fork = atoi(av[1]+2)) > 0)
- break;
- printf ("Bad cli_fork value [%s]\n",av[1]+2);
- Usage();
- case 'm':
- method = USE_MASTER; // method connection synchronously in main
- break;
- case 'p':
- method = USE_FORK;
- break;
- case 't':
- method = USE_THREAD;
- break;
- case 'T':
- method = USE_THPOOL;
- if ((poolsize = atoi(av[1]+2)) >= 0)
- break;
- printf ("Bad poolsize value [%s]\n",av[1]+2);
- Usage();
- case 'y':
- method = USE_ASY;
- use_epoll = 1;
- break;
- case 'e':
- use_epoll = 1;
- break;
- case 's':
- use_select = 1;
- break;
- case 'v':
- verbose = atoi(av[1]+2);
- break;
- case 'a':
- if ((alarmt = atoi(av[1]+2)) > 0)
- break;
- printf ("Bad alarm value [%s]\n",av[1]+2);
- Usage();
- case 'l':
- if ((load = atoi(av[1]+2)) >= 0)
- break;
- printf ("Bad lowrate-load value [%s]\n",av[1]+2);
- Usage();
- default:
- printf ("Unknown: [%s]\n",av[1]);
- Usage();
- }
- av++;
- }
- if (verbose < 0) {
- struct rlimit rlim;
- getrlimit(RLIMIT_NOFILE,&rlim);
- printf ("RLIMIT_NOFILE hard: %ld soft(cur): %ld\n",
- rlim.rlim_max, rlim.rlim_cur);
- rlim.rlim_max += 1000; rlim.rlim_cur += 1000;
- printf ("setrlimit = %d hard: %ld soft(cur): %ld\n",
- getrlimit(RLIMIT_NOFILE,&rlim), rlim.rlim_max, rlim.rlim_cur);
- getrlimit(RLIMIT_NOFILE,&rlim);
- printf ("new RLIMIT_NOFILE hard: %ld soft(cur): %ld\n",
- rlim.rlim_max, rlim.rlim_cur);
- long argmax = sysconf(_SC_ARG_MAX),
- childmax = sysconf(_SC_CHILD_MAX),
- clockticks = sysconf(_SC_CLK_TCK),
- openmax = sysconf(_SC_OPEN_MAX),
- pagesize = sysconf(_SC_PAGESIZE),
- physpages = sysconf(_SC_PHYS_PAGES),
- avphyspgs = sysconf(_SC_AVPHYS_PAGES),
- proconline = sysconf(_SC_NPROCESSORS_ONLN);
- printf ("argmax = %ld\n\
- childmax = %ld\n\
- clockticks = %ld\n\
- openmax = %ld\n\
- pagesize = %ld\n\
- physpages = %ld\n\
- avphyspgs = %ld\n\
- proconline = %ld\n",
- sysconf(_SC_ARG_MAX),sysconf(_SC_CHILD_MAX),
- sysconf(_SC_CLK_TCK),sysconf(_SC_OPEN_MAX),
- sysconf(_SC_PAGESIZE),sysconf(_SC_PHYS_PAGES),
- sysconf(_SC_AVPHYS_PAGES),sysconf(_SC_NPROCESSORS_ONLN));
- exit(0);
- }
- if (av[1] && av[2])
- if ((ncli = atoi(av[2])) < 1)
- ncli = 4;
- if (load >= 0) {
- int err = 0;
- if (!use_data) {
- err = 1;
- printf ("incompatible -l and connections only\n");
- } else if (method == USE_MASTER) {
- err = 1;
- printf ("incompatible -l and -m\n");
- }
- if (err)
- exit (1);
- }
- char host[256] = "",
- buf[1000];
- port = 12345;
- if (av[1]) {
- char *p = strchr(av[1],':');
- if (p) {
- if ((port = atoi(p+1)) <= 0)
- port = 12345;
- strncat(host,av[1],p-av[1]);
- printf ("host = [%s] port = %d\n",host,port);
- } else if ((port = atoi(av[1])) <= 0)
- port = 12345;
- }
- struct sockaddr_in myaddr, inaddr;
- if (!make_ipaddr(host,port,&myaddr))
- fatal("make ipaddr");
- if (!*host)
- strcpy(host,"127.0.0.1");
- strcpy(buf,"dataflow ");
- printf ("Test %s accept connections (-v%d) %s\n%d sec with %d clients (%s:%d)\n",
- use_select? "Select": use_epoll? "Epoll":"Synch", verbose,
- use_data? strcat(strcat (buf,mthdtitle[method-1])," processing")
- :"only connections",
- alarmt,
- ncli,
- host, port);
- if (load >= 0)
- printf ("use %d additional low rate connections\n",
- load = load? load: 1000-ncli);
- else
- load = 0;
- // exit (0);
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- pthread_attr_setstacksize(&attr,1024*128);
- sem_init(&sem,0,0);
- masterpid = gettid();
- segid = shmget(IPC_PRIVATE, 4096, IPC_CREAT|0777);
- if (segid == -1)
- fatal("shmget");
- atexit(finfun);
- com = (struct common *)shmat(segid,NULL,0);
- if (com == (void *)-1)
- fatal("shmat");
- com->exif = 0;
- sem_init (&com->sem_load,1,0);
- sem_init (&com->sem_cli,1,0);
- pipe(pdata);
- int asock, status;
- socklen_t alen = sizeof(myaddr);
- if ((asock = make_listen(&myaddr)) < 0)
- fatal("make listen");
- getsockname (asock,(struct sockaddr *)&inaddr,&alen);
- printf ("sock addr: %s:%d\n",
- inet_ntoa(inaddr.sin_addr),
- ntohs(inaddr.sin_port));
- signal(SIGINT,alah);
- signal(SIGALRM,alah);
- siginterrupt(SIGALRM,1);
- int rem = ncli % cli_fork, clients = ncli/cli_fork;
- cli = malloc((cli_fork + load? 1:0) * sizeof(*cli));
- for (i = 0; i < cli_fork; i++)
- cli[i] = runtcli(clients + (rem? --rem,1:0), host,port);
- if (load) {
- cli[cli_fork++] = loader(host,port);
- sem_post(&com->sem_load);
- } else {
- for (i = 0; i < cli_fork; i++)
- sem_post(&com->sem_cli);
- }
- alarm(alarmt);
- start = mtime();
- int sock, loop, nfds;
- if (use_select) {
- printf ("Select test\n");
- fd_set rfds, curfds;
- struct cli *pcli, *acli[sizeof(rfds)*8];
- maxsocknum = max(asock,pdata[0]);
- FD_ZERO(&rfds);
- FD_SET(asock,&rfds);
- FD_SET(pdata[0],&rfds);
- for (loop = 0;;loop++) {
- memcpy(&curfds,&rfds,sizeof(rfds));
- nfds = select(maxsocknum+1,&curfds,NULL,NULL,NULL);
- maxnfds = max(maxnfds,nfds);
- if (checkInterruptExit(nfds,"select"))
- continue; // errno == EINTR
- if (FD_ISSET(asock,&curfds)) {
- // accept
- nfds--; FD_CLR(asock,&curfds);
- do {
- sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
- } while (checkInterruptExit(sock,"select accept")); // errno == EINTR
- maxsocknum = max(maxsocknum,sock);
- nconn++;
- priconn(sock,loop,"Accept");
- if (use_data) {
- switch (method) {
- case USE_FORK:
- if (fork())
- close(sock);
- else { // child
- close(pdata[0]);
- int tot = echo(sock);
- if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
- fatal("write pipe");
- exit(0);
- }
- break;
- case USE_THREAD:
- {
- pthread_t th;
- if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
- fatal("serv create thread");
- }
- break;
- case USE_THPOOL:
- {
- pthread_mutex_lock(&thpoolock);
- if (!nwaitedths && (!poolsize || nthreads < poolsize)) {
- pthread_t th;
- if (pthread_create (&th, &attr, tpecho, NULL))
- fatal("pool thread");
- nthreads++;
- }
- struct qelem *e = get_qelem();
- e->fd = sock;
- to_queue(e);
- sem_post(&sem);
- pthread_mutex_unlock(&thpoolock);
- }
- break;
- default:
- // store sock for future processing
- pcli = acli[sock] = get_cli();
- pcli->tot = 0;
- pcli->buf[0] = 0;
- maxsocknum = max(maxsocknum,sock);
- FD_SET(sock,&rfds);
- }
- } else {
- close(sock); // -c flag
- }
- }
- if (use_data) {
- if (FD_ISSET(pdata[0],&curfds)) {
- nfds--; FD_CLR(pdata[0],&curfds);
- // subtotals in pdata pipe for -p method (default)
- int tot;
- if (read(pdata[0],&tot,sizeof(tot)) != sizeof(tot))
- fatal("serv unexpected pdata (select)");
- waitpid(-1,&status,WNOHANG);
- servconn++;
- total += tot;
- }
- for (sock = 0; nfds && sock <= maxsocknum; sock++) {
- if (FD_ISSET(sock,&curfds)) {
- int l, lw;
- nfds--;
- pcli = acli[sock];
- switch (method) {
- case USE_MASTER:
- total += echo(sock);
- close(sock);
- FD_CLR(sock,&rfds);
- free_cli(pcli);
- servconn++;
- break;
- case USE_ASY:
- do {
- l = read(sock,pcli->buf+1,998);
- } while (checkInterruptExit(l,"read USE_ASY"));
- do {
- lw = write(sock,pcli->buf+1,l);
- } while ( checkInterruptExit(lw,"write USE_ASY"));
- if (lw != l)
- fatal ("write USE_ASY");
- total += l;
- if (pcli->buf[l] == '\n' && pcli->buf[l-1] == '\n') { // end
- servconn++;
- close(sock);
- FD_CLR(sock,&rfds);
- free_cli(pcli);
- } else
- pcli->buf[0] = pcli->buf[l];
- break;
- default:
- printf ("Sorry method %s not implemented for epoll yet\n",
- mthdtitle[method-1]);
- alah (SIGALRM);
- exit(0);
- }
- }
- }
- }
- }
- } else if (use_epoll) {
- printf ("Epoll test\n");
- int evp = epoll_create1(EPOLL_CLOEXEC);
- struct epoll_event eva[1000], ev;
- struct cli asd, pdd, *pcli;
- asd.fd = asock;
- pdd.fd = pdata[0];
- ev.events = EPOLLIN;
- ev.data.ptr = &asd;
- if (epoll_ctl(evp,EPOLL_CTL_ADD,asock,&ev))
- fatal("epoll_ctl_add asock");
- ev.data.ptr = &pdd;
- if (epoll_ctl(evp,EPOLL_CTL_ADD,pdata[0],&ev))
- fatal("epoll_ctl_add pdata");
- for (loop = 0;;loop++) {
- nfds = epoll_wait (evp,eva,1000,-1);
- maxnfds = max(maxnfds,nfds);
- if (checkInterruptExit(nfds,"epoll"))
- continue; // errno == EINTR
- for (j = 0; j < nfds; j++) {
- pcli = eva[j].data.ptr;
- if ((sock = pcli->fd) < 0)
- continue; // дубликат в eva уже закрытого сокета
- if (eva[j].events == EPOLLHUP) {
- if (pcli == &asd)
- fatal("asock closed");
- if (pcli == &pdd)
- fatal("pdata closed");
- close(sock);
- #if 0
- // pcli->fd автоматически удаляется из пула при close(pcli->fd)
- if (epoll_ctl(evp,EPOLL_CTL_DEL,pcli->fd,&ev))
- fatal("epoll_ctl_add pdata");
- #endif
- free_cli(pcli);
- } else if (eva[j].events & EPOLLIN) {
- if (pcli == &asd) {
- sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
- if (checkInterruptExit(sock,"epoll accept"))
- continue; // errno == EINTR
- maxsocknum = max(maxsocknum,sock);
- nconn++;
- priconn(sock,loop,"Accept");
- if (use_data) {
- switch (method) {
- case USE_FORK:
- if (fork())
- close(sock);
- else { // child
- close(pdata[0]);
- int tot = echo(sock);
- if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
- fatal("write pipe");
- exit(0);
- }
- break;
- case USE_THREAD:
- {
- pthread_t th;
- if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
- fatal("serv create thread");
- }
- break;
- case USE_THPOOL:
- {
- pthread_mutex_lock(&thpoolock);
- if (!nwaitedths && (!poolsize || nthreads < poolsize)) {
- pthread_t th;
- if (pthread_create (&th, &attr, tpecho, NULL))
- fatal("pool thread");
- nthreads++;
- }
- struct qelem *e = get_qelem();
- e->fd = sock;
- to_queue(e);
- sem_post(&sem);
- pthread_mutex_unlock(&thpoolock);
- }
- break;
- default:
- // store sock for future processing
- pcli = get_cli();
- pcli->fd = sock;
- pcli->tot = 0;
- pcli->buf[0] = 0;
- ev.data.ptr = pcli;
- if (epoll_ctl(evp,EPOLL_CTL_ADD,sock,&ev))
- fatal("epoll_ctl_add pdata");
- }
- } else {
- close (sock); // -c flag
- }
- } else if (pcli == &pdd) {
- // subtotals in pdata pipe for -p method (default)
- int tot;
- if (read(sock,&tot,sizeof(tot)) != sizeof(tot))
- fatal("serv unexpected pdata");
- waitpid(-1,&status,WNOHANG);
- servconn++;
- total += tot;
- } else {
- int l, lw;
- // sock data from client process them according to method
- if (!use_data)
- fatal("serv expected data");
- switch (method) {
- case USE_MASTER:
- total += echo(sock);
- close(sock);
- free_cli(pcli);
- servconn++;
- break;
- case USE_ASY:
- do {
- l = read(sock,pcli->buf+1,998);
- #if 0
- printf ("read ASY: sock = %d l = %d nfds = %d j = %d loop = %d\n",
- sock,l,nfds,j,loop);
- #endif
- } while (checkInterruptExit(l,"read USE_ASY"));
- do {
- lw = write(sock,pcli->buf+1,l);
- } while ( checkInterruptExit(lw,"write USE_ASY"));
- if (lw != l)
- fatal ("write USE_ASY");
- total += l;
- if (pcli->buf[l] == '\n' && pcli->buf[l-1] == '\n') { // end
- servconn++;
- close(sock);
- free_cli(pcli);
- } else
- pcli->buf[0] = pcli->buf[l];
- break;
- default:
- printf ("Sorry method %s not implemented for epoll yet\n",
- mthdtitle[method-1]);
- alah (SIGALRM);
- exit(0);
- }
- }
- } else {
- printf ("unexpected event: %d nfds = %d eva=%d fd=%d [%s]",
- loop,nfds,j,pcli->fd,eva_names(eva[j].events,buf));
- fatal("unexpected event");
- }
- }
- }
- } else {
- for (loop=0;;loop++) {
- do {
- sock = accept(asock,(struct sockaddr *)&inaddr, &alen);
- // getchar();
- } while (checkInterruptExit(sock,"accept")); // errno == EINTR
- maxsocknum = max(maxsocknum,sock);
- nconn++;
- priconn(sock,loop,"Accept");
- if (use_data) {
- if (method == USE_MASTER) {
- total += echo(sock);
- close(sock);
- servconn++;
- } else if (method == USE_FORK) {
- if (fork() == 0) {
- close(pdata[0]);
- int tot = echo(sock);
- if (write(pdata[1],&tot,sizeof(tot)) != sizeof(tot))
- fatal("write pipe");
- exit(0);
- } else { // parent and USE_FORK
- close(sock);
- waitpid(-1,&status,WNOHANG);
- int subtot[1000], l, j, nb;
- nb = 0;
- if (ioctl(pdata[0],FIONREAD,&nb))
- fatal("ioctl");
- if (nb) {
- l = read(pdata[0],subtot,sizeof(subtot));
- l /= sizeof(subtot[0]);
- #if 0
- if (l > maxl)
- maxl = l;
- #endif
- servconn += l;
- for (j = 0; j < l; j++) {
- total += subtot[j];
- }
- }
- }
- } else if (method == USE_THREAD) {
- pthread_t th;
- if (pthread_create (&th, &attr, thecho, (void *)((long)sock)))
- fatal("serv create thread");
- } else if (method == USE_THPOOL) {
- pthread_mutex_lock(&thpoolock);
- if (!nwaitedths && (!poolsize || nthreads < poolsize)) {
- pthread_t th;
- if (pthread_create (&th, &attr, tpecho, NULL))
- fatal("pool thread");
- nthreads++;
- }
- struct qelem *e = get_qelem();
- e->fd = sock;
- to_queue(e);
- sem_post(&sem);
- pthread_mutex_unlock(&thpoolock);
- } else {
- printf ("Sorry method %s not implemented yet\n",
- mthdtitle[method-1]);
- alah (SIGALRM);
- exit(0);
- }
- } else {
- close (sock);
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement