Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <errno.h>
- #include <unistd.h>
- #include <signal.h>
- #include <poll.h>
- #include <arpa/inet.h>
- #include <netinet/in.h>
- #include <sys/socket.h>
- #include <sys/fcntl.h>
- #include <pthread.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <stdint.h>
- #include <string.h>
- #include <time.h>
- #define BMP_DEF(name,size) uintptr_t name[size/sizeof(uintptr_t)]
- #define BMP_GET(bmp,bit) ((bmp[bit/sizeof(uintptr_t)] >> (bit%sizeof(uintptr_t))) & 1)
- #define BMP_ON(bmp,bit) (bmp[bit/sizeof(uintptr_t)] |= 1<<(bit%sizeof(uintptr_t)))
- #define BMP_OFF(bmp,bit) (bmp[bit/sizeof(uintptr_t)] &= ~(1<<(bit%sizeof(uintptr_t))))
- #define MIN(a,b) (((a)<(b))?(a):(b))
- //#define PRINTD
- #ifndef PRINTD
- #define printf_d
- #define puts_d
- #else
- #define printf_d printf_d
- #define puts_d puts
- #endif
- #define PROXY_PORT 7580
- #define USERS_PER_THREAD 32
- #define MAX_USERS 128
- #define MAX_WORKERS (MAX_USERS/USERS_PER_THREAD)
- typedef struct {
- int s;
- struct sockaddr_in addr;
- char sendbuf[1024];
- size_t sendbufsize; //maximum is 1024
- } conn_t;
- typedef enum {
- CONNECT = 1,
- BIND
- } proxymode_t;
- typedef enum {
- REQUEST = 0,
- ESTABLISH,
- RESPONSE,
- DATAPIPE,
- DISCONNECT
- } proxystage_t;
- const char* stages[] = {
- "REQUEST",
- "ESTABLISH",
- "RESPONSE",
- "DATAPIPE",
- "DISCONNECT"
- };
- typedef enum {
- REQUEST_GRANTED = 90,
- REQUEST_FAILED,
- REQUEST_FAILED_IDENTD,
- REQUEST_ACCESS_DENIED
- } proxystatus_t;
- const char* statuses[] = {
- "REQUEST_GRANTED",
- "REQUEST_FAILED"
- };
- typedef struct {
- conn_t cl;
- conn_t sv;
- conn_t hld;
- proxymode_t mode;
- proxystage_t stage;
- proxystatus_t status;
- time_t last;
- } proxy_t;
- typedef struct {
- pthread_t thread;
- int needtoterminate;
- proxy_t user[MAX_USERS];
- BMP_DEF(userslots,MAX_USERS);
- } proxy_worker_t;
- typedef struct {
- conn_t sk;
- proxy_worker_t worker[MAX_WORKERS];
- BMP_DEF(workerslots,32);
- pthread_mutex_t lock;
- } proxy_server_t;
- proxy_server_t server;
- static void proxy_make_nonblocking(conn_t* conn)
- {
- int flags;
- flags = fcntl(conn->s,F_GETFL,0);
- flags |= O_NONBLOCK;
- fcntl(conn->s,F_SETFL,flags);
- }
- void proxy_thread_loop(proxy_worker_t* wk);
- void init_proxy_server(int port)
- {
- memset(&server,'\0',sizeof(server));
- server.sk.s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- server.sk.addr.sin_family = AF_INET;
- server.sk.addr.sin_addr.s_addr = INADDR_ANY;
- server.sk.addr.sin_port = htons(port);
- bind(server.sk.s,(const struct sockaddr*)&server.sk.addr,
- sizeof(struct sockaddr_in));
- listen(server.sk.s,SOMAXCONN);
- //proxy_make_nonblocking(&proxy.sk);
- }
- //static proxy_t* proxy_get_pipe(int idx)
- //{
- // return &proxy.user[(idx-1)/2];
- //}
- int proxy_worker_count_users(proxy_worker_t* wk)
- {
- int i,num;
- num = 0;
- for(i = 0; i < USERS_PER_THREAD; i++)
- {
- if(!BMP_GET(wk->userslots,i)) continue;
- num++;
- }
- return num;
- }
- void proxy_update_stat()
- {
- int workers,users;
- int i;
- workers = 0;
- users = 0;
- for(i = 0; i < MAX_WORKERS; i++)
- {
- if(!BMP_GET(server.workerslots,i)) continue;
- workers++;
- users += proxy_worker_count_users(&server.worker[i]);
- }
- printf("Workers: %d; Users: %d\n",workers,users);
- }
- void proxy_worker_accept_client(proxy_worker_t* wk,conn_t* conn)
- {
- int i;
- proxy_t* user;
- for(i = 0; i < MAX_USERS; i++)
- {
- if(BMP_GET(wk->userslots,i)) continue;
- user = &wk->user[i];
- break;
- }
- if(i == MAX_USERS)
- {
- printf_d("no free proxies for %s!\n",
- inet_ntoa(conn->addr.sin_addr));
- close(conn->s);
- return;
- }
- conn->sendbufsize = 0;
- memset(user,'\0',sizeof(proxy_t));
- memcpy(&user->cl,conn,sizeof(conn_t));
- user->stage = REQUEST;
- user->last = time(NULL);
- proxy_make_nonblocking(&user->cl);
- BMP_ON(wk->userslots,i); //Allocate user
- }
- void proxy_worker_client_disconnect(proxy_worker_t* wk,proxy_t* user)
- {
- close(user->cl.s);
- close(user->sv.s);
- close(user->hld.s);
- BMP_OFF(wk->userslots,((char*)user-(char*)wk->user)/sizeof(proxy_t));
- }
- void* proxy_worker_thread_loop(void* arg)
- {
- proxy_worker_t* wk;
- wk = (proxy_worker_t*)arg;
- while(!wk->needtoterminate)
- {
- proxy_thread_loop(wk);
- }
- pthread_exit(0);
- return NULL;
- }
- void proxy_accept_client(conn_t* conn)
- {
- int i;
- proxy_worker_t* wk;
- pthread_mutex_lock(&server.lock);
- for(i = 0; i < MAX_WORKERS; i++)
- {
- if(!BMP_GET(server.workerslots,i)) break;
- if(proxy_worker_count_users(&server.worker[i]) < USERS_PER_THREAD)
- break;
- }
- if(i == MAX_WORKERS)
- {
- printf_d("No free workers!\n");
- pthread_mutex_unlock(&server.lock);
- return;
- }
- wk = &server.worker[i];
- proxy_worker_accept_client(wk,conn);
- if(!BMP_GET(server.workerslots,i))
- {
- //We need to create worker
- memset(wk,'\0',sizeof(proxy_worker_t));
- pthread_create(&wk->thread,NULL,proxy_worker_thread_loop,wk);
- printf_d("created worker %d thread %lu\n",i,wk->thread);
- }
- BMP_ON(server.workerslots,i);
- pthread_mutex_unlock(&server.lock);
- proxy_update_stat();
- }
- void proxy_client_disconnect(proxy_worker_t* wk,proxy_t* user)
- {
- int i;
- pthread_mutex_lock(&server.lock);
- //Get current worker
- i = ((char*)wk-(char*)server.worker)/sizeof(proxy_worker_t);
- proxy_worker_client_disconnect(wk,user);
- if(proxy_worker_count_users(wk) == 0)
- {
- //Destroy worker
- wk->needtoterminate = 1;
- pthread_join(wk->thread,NULL); //Wait for termination
- printf_d("destroyed worker %d thread %lu\n",i,wk->thread);
- memset(wk,'\0',sizeof(proxy_worker_t));
- BMP_OFF(server.workerslots,i);
- }
- pthread_mutex_unlock(&server.lock);
- proxy_update_stat();
- }
- // Thread worker will handle only 32 proxies or 64 sockets
- void proxy_thread_loop(proxy_worker_t* wk)
- {
- struct pollfd polls[USERS_PER_THREAD*2];
- int rel[USERS_PER_THREAD*2];
- time_t curtime;
- int p_num;
- int i;
- memset(&polls,'\0',sizeof(polls));
- p_num = 0;
- pthread_mutex_lock(&server.lock);
- for(i = 0; i < USERS_PER_THREAD; i++)
- {
- proxy_t* user;
- if(!BMP_GET(wk->userslots,i)) continue;
- user = &wk->user[i];
- rel[(p_num)/2] = i;
- printf_d("user %d stage %s\n",i,stages[user->stage]);
- switch(user->stage)
- {
- case REQUEST:
- polls[p_num].fd = user->cl.s;
- polls[p_num].events = POLLIN;
- p_num++;
- polls[p_num].fd = -1;
- p_num++; //padding for proxy pipe
- break;
- case ESTABLISH: break;
- case RESPONSE:
- polls[p_num].fd = user->cl.s;
- polls[p_num].events = POLLOUT;
- p_num++;
- polls[p_num].fd = -1;
- p_num++; //padding for proxy pipe
- break;
- case DATAPIPE:
- polls[p_num].fd = user->cl.s;
- if(user->cl.sendbufsize != 0)
- polls[p_num].events = POLLOUT;
- else polls[p_num].events = POLLIN;
- p_num++;
- polls[p_num].fd = user->sv.s;
- if(user->sv.sendbufsize != 0)
- polls[p_num].events = POLLOUT;
- else polls[p_num].events = POLLIN;
- p_num++;
- break;
- case DISCONNECT:
- printf_d("user %p disconnect\n",user);
- proxy_client_disconnect(wk,user);
- continue;
- }
- }
- pthread_mutex_unlock(&server.lock);
- //We can't give access to thread terminate himself
- printf_d("p_num %d\n",p_num);
- if(poll(polls,p_num,1000))
- {
- for(i = 0; i < p_num; i++)
- {
- if(polls[i].fd == -1) continue;
- proxy_t* user;
- const char* ch;
- int rem;
- user = &wk->user[rel[(i)/2]];
- rem = (i)%2;
- if(polls[i].revents == 0) continue;
- else if(polls[i].revents & (POLLHUP|POLLERR))
- {
- user->stage = DISCONNECT;
- printf_d("%p disconnect due POLLHUP|POLLERR\n",user);
- continue;
- }
- if(rem == 0
- && user->stage != REQUEST
- && user->stage != RESPONSE
- && user->stage != DATAPIPE)
- {
- continue;
- }
- if(rem != 0
- /*&& user->stage != ESTABLISH*/
- && user->stage != DATAPIPE)
- {
- continue;
- }
- ch = "UNKNOWN";
- if(polls[i].revents & POLLIN)
- ch = "POLLIN";
- else if(polls[i].revents & POLLOUT)
- ch = "POLLOUT";
- printf_d("POLL user %d %d stage %s (%s %d %d %d)\n",i,rel[(i-1)/2],
- stages[user->stage],ch,polls[i].events,polls[i].revents,polls[i].fd);
- if(rem == 0) //client socket
- {
- if(user->stage == REQUEST && (polls[i].revents & POLLIN))
- {
- char szBuf[32];
- //read request
- recv(user->cl.s,szBuf,32,0);
- user->sv.s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- user->sv.addr.sin_family = AF_INET;
- user->sv.addr.sin_addr.s_addr = *(uint32_t*)(szBuf+4);
- user->sv.addr.sin_port = *(uint16_t*)(szBuf+2);
- user->mode = *(uint8_t*)(szBuf+1);
- printf_d("mode %d -> %s:%d\n",user->mode,inet_ntoa(user->sv.addr.sin_addr),
- ntohs(user->sv.addr.sin_port));
- //prepare anything for connection
- if(user->mode == CONNECT)
- {
- proxy_make_nonblocking(&user->sv);
- connect(user->sv.s,(const struct sockaddr*)&user->sv.addr,
- sizeof(struct sockaddr_in));
- }
- else
- {
- //bind socket
- bind(user->sv.s,(const struct sockaddr*)&user->sv.addr,
- sizeof(struct sockaddr_in));
- listen(user->sv.s,1);
- proxy_make_nonblocking(&user->sv);
- }
- printf_d("proceed request. switching user to ESTABLISH\n");
- user->stage = ESTABLISH;
- //return;
- user->last = time(NULL);
- }
- else if(user->stage == RESPONSE && (polls[i].revents & POLLOUT))
- {
- char szBuf[32];
- *(uint8_t*)(szBuf) = 0;
- *(uint8_t*)(szBuf+1) = user->status;
- *(uint16_t*)(szBuf+2) = user->sv.addr.sin_port;
- *(struct in_addr*)(szBuf+4) = user->sv.addr.sin_addr;
- printf_d("response send %d\n",send(user->cl.s,szBuf,8,0));
- if(user->status == REQUEST_GRANTED)
- user->stage = DATAPIPE;
- else user->stage = DISCONNECT;
- printf_d("RESPONSE finished. switched to %s stage\n",stages[user->stage]);
- user->last = time(NULL);
- }
- else if(user->stage == DATAPIPE && polls[i].revents & (POLLIN|POLLOUT))
- {
- int sent;
- if(polls[i].revents & POLLIN)
- {
- user->sv.sendbufsize = recv(user->cl.s,
- user->sv.sendbuf,1024,0);
- if(user->sv.sendbufsize < 1)
- {
- user->stage = DISCONNECT;
- continue;
- }
- printf_d("client datapipe read %d\n",user->sv.sendbufsize);
- }
- else if(polls[i].revents & POLLOUT)
- {
- sent = send(user->cl.s,user->cl.sendbuf,user->cl.sendbufsize,0);
- user->cl.sendbufsize = 0;
- if(sent < 1)
- {
- user->stage = DISCONNECT;
- continue;
- }
- printf_d("client datapipe send %d\n",sent);
- }
- user->last = time(NULL);
- }
- }
- else //server socket
- {
- if(user->stage == DATAPIPE && polls[i].revents & (POLLIN|POLLOUT))
- {
- int sent;
- if(polls[i].revents & POLLIN)
- {
- user->cl.sendbufsize = recv(user->sv.s,
- user->cl.sendbuf,1024,0);
- if(user->cl.sendbufsize < 1)
- {
- user->stage = DISCONNECT;
- continue;
- }
- printf_d("server datapipe read %d\n",user->cl.sendbufsize);
- }
- else if(polls[i].revents & POLLOUT)
- {
- sent = send(user->sv.s,user->sv.sendbuf,user->sv.sendbufsize,0);
- user->sv.sendbufsize = 0;
- if(sent < 1)
- {
- user->stage = DISCONNECT;
- continue;
- }
- printf_d("server datapipe send %d\n",sent);
- }
- user->last = time(NULL);
- }
- }
- }
- }
- for(i = 0; i < USERS_PER_THREAD; i++)
- {
- proxy_t* user;
- if(!BMP_GET(wk->userslots,i)) continue;
- user = &wk->user[i];
- if(user->stage != ESTABLISH) continue;
- if(user->mode == CONNECT)
- {
- connect(user->sv.s,(const struct sockaddr*)&user->sv.addr,
- sizeof(struct sockaddr_in));
- printf_d("%d\n",errno);
- if(errno == EISCONN)
- {
- user->status = REQUEST_GRANTED;
- user->stage = RESPONSE;
- printf_d("connected\n");
- }
- else if(errno == EINPROGRESS || errno == EALREADY)
- {
- printf_d("connecting..\n");
- }
- else
- {
- user->status = REQUEST_FAILED;
- user->stage = RESPONSE;
- printf_d("failed to connect\n");
- }
- }
- }
- curtime = time(NULL);
- for(i = 0; i < USERS_PER_THREAD; i++)
- {
- proxy_t* user;
- if(!BMP_GET(wk->userslots,i)) continue;
- user = &wk->user[i];
- if(curtime - user->last > 60)
- {
- if(user->stage == ESTABLISH)
- {
- user->status = REQUEST_FAILED;
- user->stage = RESPONSE;
- }
- else user->stage = DISCONNECT;
- }
- }
- }
- void proxy_main_loop()
- {
- socklen_t fromlen;
- conn_t conn;
- conn.sendbufsize = 0;
- while(1)
- {
- fromlen = sizeof(struct sockaddr_in);
- conn.s = accept(server.sk.s,(struct sockaddr*)&conn.addr,&fromlen);
- printf_d("accept connection %s\n",inet_ntoa(conn.addr.sin_addr));
- proxy_accept_client(&conn);
- }
- }
- int main()
- {
- signal(SIGPIPE,SIG_IGN);
- init_proxy_server(PROXY_PORT);
- proxy_main_loop();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement