Advertisement
Guest User

Untitled

a guest
Jan 21st, 2019
485
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 12.71 KB | None | 0 0
  1. #include <errno.h>
  2. #include <unistd.h>
  3. #include <signal.h>
  4. #include <poll.h>
  5. #include <arpa/inet.h>
  6. #include <netinet/in.h>
  7. #include <sys/socket.h>
  8. #include <sys/fcntl.h>
  9. #include <pthread.h>
  10. #include <stdio.h>
  11. #include <stdlib.h>
  12. #include <stdint.h>
  13. #include <string.h>
  14. #include <time.h>
  15.  
  16. #define BMP_DEF(name,size) uintptr_t name[size/sizeof(uintptr_t)]
  17. #define BMP_GET(bmp,bit) ((bmp[bit/sizeof(uintptr_t)] >> (bit%sizeof(uintptr_t))) & 1)
  18. #define BMP_ON(bmp,bit) (bmp[bit/sizeof(uintptr_t)] |= 1<<(bit%sizeof(uintptr_t)))
  19. #define BMP_OFF(bmp,bit) (bmp[bit/sizeof(uintptr_t)] &= ~(1<<(bit%sizeof(uintptr_t))))
  20.  
  21. #define MIN(a,b) (((a)<(b))?(a):(b))
  22.  
  23. //#define PRINTD
  24. #ifndef PRINTD
  25. #define printf_d
  26. #define puts_d
  27. #else
  28. #define printf_d printf_d
  29. #define puts_d puts
  30. #endif
  31.  
  32. #define PROXY_PORT 7580
  33. #define USERS_PER_THREAD 32
  34. #define MAX_USERS 128
  35. #define MAX_WORKERS (MAX_USERS/USERS_PER_THREAD)
  36.  
  37. typedef struct {
  38.     int s;
  39.     struct sockaddr_in addr;
  40.    
  41.     char sendbuf[1024];
  42.     size_t sendbufsize; //maximum is 1024
  43. } conn_t;
  44.  
  45. typedef enum {
  46.     CONNECT = 1,
  47.     BIND
  48. } proxymode_t;
  49.  
  50. typedef enum {
  51.     REQUEST = 0,
  52.     ESTABLISH,
  53.     RESPONSE,
  54.     DATAPIPE,
  55.     DISCONNECT
  56. } proxystage_t;
  57.  
  58. const char* stages[] = {
  59.     "REQUEST",
  60.     "ESTABLISH",
  61.     "RESPONSE",
  62.     "DATAPIPE",
  63.     "DISCONNECT"
  64. };
  65.  
  66. typedef enum {
  67.     REQUEST_GRANTED = 90,
  68.     REQUEST_FAILED,
  69.     REQUEST_FAILED_IDENTD,
  70.     REQUEST_ACCESS_DENIED
  71. } proxystatus_t;
  72.  
  73. const char* statuses[] = {
  74.     "REQUEST_GRANTED",
  75.     "REQUEST_FAILED"
  76. };
  77.  
  78. typedef struct {
  79.     conn_t cl;
  80.     conn_t sv;
  81.     conn_t hld;
  82.     proxymode_t mode;
  83.     proxystage_t stage;
  84.     proxystatus_t status;
  85.     time_t last;
  86. } proxy_t;
  87.  
  88. typedef struct {
  89.     pthread_t thread;
  90.     int needtoterminate;
  91.     proxy_t user[MAX_USERS];
  92.     BMP_DEF(userslots,MAX_USERS);
  93. } proxy_worker_t;
  94.  
  95. typedef struct {
  96.     conn_t sk;
  97.     proxy_worker_t worker[MAX_WORKERS];
  98.     BMP_DEF(workerslots,32);
  99.     pthread_mutex_t lock;
  100. } proxy_server_t;
  101.  
  102. proxy_server_t server;
  103.  
  104. static void proxy_make_nonblocking(conn_t* conn)
  105. {
  106.     int flags;
  107.     flags = fcntl(conn->s,F_GETFL,0);
  108.     flags |= O_NONBLOCK;
  109.     fcntl(conn->s,F_SETFL,flags);
  110. }
  111.  
  112. void proxy_thread_loop(proxy_worker_t* wk);
  113.  
  114. void init_proxy_server(int port)
  115. {
  116.     memset(&server,'\0',sizeof(server));
  117.    
  118.     server.sk.s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
  119.     server.sk.addr.sin_family = AF_INET;
  120.     server.sk.addr.sin_addr.s_addr = INADDR_ANY;
  121.     server.sk.addr.sin_port = htons(port);
  122.    
  123.     bind(server.sk.s,(const struct sockaddr*)&server.sk.addr,
  124.         sizeof(struct sockaddr_in));
  125.     listen(server.sk.s,SOMAXCONN);
  126.    
  127.     //proxy_make_nonblocking(&proxy.sk);
  128. }
  129.  
  130. //static proxy_t* proxy_get_pipe(int idx)
  131. //{
  132. //  return &proxy.user[(idx-1)/2];
  133. //}
  134.  
  135. int proxy_worker_count_users(proxy_worker_t* wk)
  136. {
  137.     int i,num;
  138.    
  139.     num = 0;
  140.     for(i = 0; i < USERS_PER_THREAD; i++)
  141.     {
  142.         if(!BMP_GET(wk->userslots,i)) continue;
  143.         num++;
  144.     }
  145.    
  146.     return num;
  147. }
  148.  
  149. void proxy_update_stat()
  150. {
  151.     int workers,users;
  152.     int i;
  153.    
  154.     workers = 0;
  155.     users = 0;
  156.     for(i = 0; i < MAX_WORKERS; i++)
  157.     {
  158.         if(!BMP_GET(server.workerslots,i)) continue;
  159.         workers++;
  160.         users += proxy_worker_count_users(&server.worker[i]);
  161.     }
  162.    
  163.     printf("Workers: %d; Users: %d\n",workers,users);
  164. }
  165.  
  166. void proxy_worker_accept_client(proxy_worker_t* wk,conn_t* conn)
  167. {
  168.     int i;
  169.     proxy_t* user;
  170.    
  171.     for(i = 0; i < MAX_USERS; i++)
  172.     {
  173.         if(BMP_GET(wk->userslots,i)) continue;
  174.         user = &wk->user[i];
  175.         break;
  176.     }
  177.     if(i == MAX_USERS)
  178.     {
  179.         printf_d("no free proxies for %s!\n",
  180.             inet_ntoa(conn->addr.sin_addr));
  181.         close(conn->s);
  182.         return;
  183.     }
  184.    
  185.    
  186.     conn->sendbufsize = 0;
  187.     memset(user,'\0',sizeof(proxy_t));
  188.     memcpy(&user->cl,conn,sizeof(conn_t));
  189.    
  190.     user->stage = REQUEST;
  191.     user->last = time(NULL);
  192.     proxy_make_nonblocking(&user->cl);
  193.     BMP_ON(wk->userslots,i); //Allocate user
  194. }
  195.  
  196. void proxy_worker_client_disconnect(proxy_worker_t* wk,proxy_t* user)
  197. {  
  198.     close(user->cl.s);
  199.     close(user->sv.s);
  200.     close(user->hld.s);
  201.     BMP_OFF(wk->userslots,((char*)user-(char*)wk->user)/sizeof(proxy_t));
  202. }
  203.  
  204. void* proxy_worker_thread_loop(void* arg)
  205. {
  206.     proxy_worker_t* wk;
  207.    
  208.     wk = (proxy_worker_t*)arg;
  209.     while(!wk->needtoterminate)
  210.     {
  211.         proxy_thread_loop(wk);
  212.     }
  213.     pthread_exit(0);
  214.     return NULL;
  215. }
  216.  
  217. void proxy_accept_client(conn_t* conn)
  218. {
  219.     int i;
  220.     proxy_worker_t* wk;
  221.    
  222.     pthread_mutex_lock(&server.lock);
  223.     for(i = 0; i < MAX_WORKERS; i++)
  224.     {
  225.         if(!BMP_GET(server.workerslots,i)) break;
  226.         if(proxy_worker_count_users(&server.worker[i]) < USERS_PER_THREAD)
  227.             break;
  228.     }
  229.     if(i == MAX_WORKERS)
  230.     {
  231.         printf_d("No free workers!\n");
  232.         pthread_mutex_unlock(&server.lock);
  233.         return;
  234.     }
  235.    
  236.     wk = &server.worker[i];
  237.     proxy_worker_accept_client(wk,conn);
  238.    
  239.     if(!BMP_GET(server.workerslots,i))
  240.     {
  241.         //We need to create worker
  242.         memset(wk,'\0',sizeof(proxy_worker_t));
  243.         pthread_create(&wk->thread,NULL,proxy_worker_thread_loop,wk);
  244.         printf_d("created worker %d thread %lu\n",i,wk->thread);
  245.     }
  246.    
  247.     BMP_ON(server.workerslots,i);
  248.     pthread_mutex_unlock(&server.lock);
  249.    
  250.     proxy_update_stat();
  251. }
  252.  
  253. void proxy_client_disconnect(proxy_worker_t* wk,proxy_t* user)
  254. {
  255.     int i;
  256.    
  257.     pthread_mutex_lock(&server.lock);
  258.    
  259.     //Get current worker
  260.     i = ((char*)wk-(char*)server.worker)/sizeof(proxy_worker_t);
  261.     proxy_worker_client_disconnect(wk,user);
  262.     if(proxy_worker_count_users(wk) == 0)
  263.     {
  264.         //Destroy worker
  265.         wk->needtoterminate = 1;
  266.         pthread_join(wk->thread,NULL); //Wait for termination
  267.         printf_d("destroyed worker %d thread %lu\n",i,wk->thread);
  268.         memset(wk,'\0',sizeof(proxy_worker_t));
  269.         BMP_OFF(server.workerslots,i);
  270.     }
  271.    
  272.     pthread_mutex_unlock(&server.lock);
  273.    
  274.     proxy_update_stat();
  275. }
  276.  
  277. // Thread worker will handle only 32 proxies or 64 sockets
  278.  
  279. void proxy_thread_loop(proxy_worker_t* wk)
  280. {
  281.     struct pollfd polls[USERS_PER_THREAD*2];
  282.     int rel[USERS_PER_THREAD*2];
  283.     time_t curtime;
  284.     int p_num;
  285.     int i;
  286.    
  287.     memset(&polls,'\0',sizeof(polls));
  288.    
  289.     p_num = 0;
  290.    
  291.     pthread_mutex_lock(&server.lock);
  292.     for(i = 0; i < USERS_PER_THREAD; i++)
  293.     {
  294.         proxy_t* user;
  295.    
  296.         if(!BMP_GET(wk->userslots,i)) continue;
  297.         user = &wk->user[i];
  298.        
  299.         rel[(p_num)/2] = i;
  300.         printf_d("user %d stage %s\n",i,stages[user->stage]);
  301.         switch(user->stage)
  302.         {
  303.         case REQUEST:
  304.             polls[p_num].fd = user->cl.s;
  305.             polls[p_num].events = POLLIN;
  306.             p_num++;
  307.             polls[p_num].fd = -1;
  308.             p_num++; //padding for proxy pipe
  309.             break;
  310.         case ESTABLISH: break;
  311.         case RESPONSE:
  312.             polls[p_num].fd = user->cl.s;
  313.             polls[p_num].events = POLLOUT;
  314.             p_num++;
  315.             polls[p_num].fd = -1;
  316.             p_num++; //padding for proxy pipe
  317.             break;
  318.         case DATAPIPE:
  319.             polls[p_num].fd = user->cl.s;
  320.             if(user->cl.sendbufsize != 0)
  321.                 polls[p_num].events = POLLOUT;
  322.             else polls[p_num].events = POLLIN;
  323.             p_num++;
  324.             polls[p_num].fd = user->sv.s;
  325.             if(user->sv.sendbufsize != 0)
  326.                 polls[p_num].events = POLLOUT;
  327.             else polls[p_num].events = POLLIN;
  328.             p_num++;
  329.             break;
  330.         case DISCONNECT:
  331.             printf_d("user %p disconnect\n",user);
  332.             proxy_client_disconnect(wk,user);
  333.             continue;
  334.         }
  335.     }
  336.     pthread_mutex_unlock(&server.lock);
  337.    
  338.     //We can't give access to thread terminate himself
  339.    
  340.     printf_d("p_num %d\n",p_num);
  341.     if(poll(polls,p_num,1000))
  342.     {
  343.         for(i = 0; i < p_num; i++)
  344.         {
  345.             if(polls[i].fd == -1) continue;
  346.            
  347.             proxy_t* user;
  348.             const char* ch;
  349.             int rem;
  350.        
  351.             user = &wk->user[rel[(i)/2]];
  352.             rem = (i)%2;
  353.            
  354.             if(polls[i].revents == 0) continue;
  355.             else if(polls[i].revents & (POLLHUP|POLLERR))
  356.             {
  357.                 user->stage = DISCONNECT;
  358.                 printf_d("%p disconnect due POLLHUP|POLLERR\n",user);
  359.                 continue;
  360.             }
  361.            
  362.             if(rem == 0
  363.                 && user->stage != REQUEST
  364.                 && user->stage != RESPONSE
  365.                 && user->stage != DATAPIPE)
  366.             {
  367.                 continue;  
  368.             }
  369.             if(rem != 0
  370.                 /*&& user->stage != ESTABLISH*/
  371.                 && user->stage != DATAPIPE)
  372.             {
  373.                 continue;
  374.             }
  375.            
  376.             ch = "UNKNOWN";
  377.             if(polls[i].revents & POLLIN)
  378.                 ch = "POLLIN";
  379.             else if(polls[i].revents & POLLOUT)
  380.                 ch = "POLLOUT";
  381.            
  382.             printf_d("POLL user %d %d stage %s (%s %d %d %d)\n",i,rel[(i-1)/2],
  383.                 stages[user->stage],ch,polls[i].events,polls[i].revents,polls[i].fd);
  384.            
  385.             if(rem == 0) //client socket
  386.             {
  387.                 if(user->stage == REQUEST && (polls[i].revents & POLLIN))
  388.                 {
  389.                     char szBuf[32];
  390.                    
  391.                     //read request
  392.                     recv(user->cl.s,szBuf,32,0);
  393.                     user->sv.s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
  394.                     user->sv.addr.sin_family = AF_INET;
  395.                     user->sv.addr.sin_addr.s_addr = *(uint32_t*)(szBuf+4);
  396.                     user->sv.addr.sin_port = *(uint16_t*)(szBuf+2);
  397.                     user->mode = *(uint8_t*)(szBuf+1);
  398.                     printf_d("mode %d -> %s:%d\n",user->mode,inet_ntoa(user->sv.addr.sin_addr),
  399.                         ntohs(user->sv.addr.sin_port));
  400.                     //prepare anything for connection
  401.                     if(user->mode == CONNECT)
  402.                     {
  403.                         proxy_make_nonblocking(&user->sv);
  404.                         connect(user->sv.s,(const struct sockaddr*)&user->sv.addr,
  405.                             sizeof(struct sockaddr_in));
  406.                     }
  407.                     else
  408.                     {
  409.                         //bind socket
  410.                         bind(user->sv.s,(const struct sockaddr*)&user->sv.addr,
  411.                             sizeof(struct sockaddr_in));
  412.                         listen(user->sv.s,1);
  413.                         proxy_make_nonblocking(&user->sv);
  414.                     }
  415.                     printf_d("proceed request. switching user to ESTABLISH\n");
  416.                     user->stage = ESTABLISH;
  417.                     //return;
  418.                     user->last = time(NULL);
  419.                 }
  420.                 else if(user->stage == RESPONSE && (polls[i].revents & POLLOUT))
  421.                 {
  422.                     char szBuf[32];        
  423.                     *(uint8_t*)(szBuf) = 0;
  424.                     *(uint8_t*)(szBuf+1) = user->status;
  425.                     *(uint16_t*)(szBuf+2) = user->sv.addr.sin_port;
  426.                     *(struct in_addr*)(szBuf+4) = user->sv.addr.sin_addr;
  427.                    
  428.                     printf_d("response send %d\n",send(user->cl.s,szBuf,8,0));
  429.                     if(user->status == REQUEST_GRANTED)
  430.                         user->stage = DATAPIPE;
  431.                     else user->stage = DISCONNECT;
  432.                     printf_d("RESPONSE finished. switched to %s stage\n",stages[user->stage]);
  433.                     user->last = time(NULL);
  434.                 }
  435.                 else if(user->stage == DATAPIPE && polls[i].revents & (POLLIN|POLLOUT))
  436.                 {
  437.                     int sent;
  438.                     if(polls[i].revents & POLLIN)
  439.                     {
  440.                         user->sv.sendbufsize = recv(user->cl.s,
  441.                             user->sv.sendbuf,1024,0);
  442.                         if(user->sv.sendbufsize < 1)
  443.                         {
  444.                             user->stage = DISCONNECT;
  445.                             continue;
  446.                         }
  447.                         printf_d("client datapipe read %d\n",user->sv.sendbufsize);
  448.                     }
  449.                     else if(polls[i].revents & POLLOUT)
  450.                     {
  451.                         sent = send(user->cl.s,user->cl.sendbuf,user->cl.sendbufsize,0);
  452.                         user->cl.sendbufsize = 0;
  453.                         if(sent < 1)
  454.                         {
  455.                             user->stage = DISCONNECT;
  456.                             continue;
  457.                         }
  458.                         printf_d("client datapipe send %d\n",sent);
  459.                     }
  460.                     user->last = time(NULL);
  461.                 }
  462.             }
  463.             else //server socket
  464.             {
  465.                 if(user->stage == DATAPIPE && polls[i].revents & (POLLIN|POLLOUT))
  466.                 {
  467.                     int sent;
  468.                     if(polls[i].revents & POLLIN)
  469.                     {
  470.                         user->cl.sendbufsize = recv(user->sv.s,
  471.                             user->cl.sendbuf,1024,0);
  472.                         if(user->cl.sendbufsize < 1)
  473.                         {
  474.                             user->stage = DISCONNECT;
  475.                             continue;
  476.                         }
  477.                         printf_d("server datapipe read %d\n",user->cl.sendbufsize);
  478.                     }
  479.                 else if(polls[i].revents & POLLOUT)
  480.                     {
  481.                         sent = send(user->sv.s,user->sv.sendbuf,user->sv.sendbufsize,0);
  482.                         user->sv.sendbufsize = 0;
  483.                         if(sent < 1)
  484.                         {
  485.                             user->stage = DISCONNECT;
  486.                             continue;
  487.                         }
  488.                         printf_d("server datapipe send %d\n",sent);
  489.                     }
  490.                     user->last = time(NULL);
  491.                 }
  492.             }
  493.         }
  494.     }
  495.    
  496.     for(i = 0; i < USERS_PER_THREAD; i++)
  497.     {
  498.         proxy_t* user;
  499.    
  500.         if(!BMP_GET(wk->userslots,i)) continue;
  501.         user = &wk->user[i];
  502.         if(user->stage != ESTABLISH) continue;
  503.    
  504.         if(user->mode == CONNECT)
  505.         {
  506.             connect(user->sv.s,(const struct sockaddr*)&user->sv.addr,
  507.                 sizeof(struct sockaddr_in));
  508.             printf_d("%d\n",errno);
  509.             if(errno == EISCONN)
  510.             {
  511.                 user->status = REQUEST_GRANTED;
  512.                 user->stage = RESPONSE;
  513.                 printf_d("connected\n");
  514.             }
  515.             else if(errno == EINPROGRESS || errno == EALREADY)
  516.             {
  517.                 printf_d("connecting..\n");
  518.             }
  519.             else
  520.             {
  521.                 user->status = REQUEST_FAILED;
  522.                 user->stage = RESPONSE;
  523.                 printf_d("failed to connect\n");
  524.             }
  525.         }
  526.     }
  527.     curtime = time(NULL);
  528.     for(i = 0; i < USERS_PER_THREAD; i++)
  529.     {
  530.         proxy_t* user;
  531.        
  532.         if(!BMP_GET(wk->userslots,i)) continue;
  533.         user = &wk->user[i];
  534.        
  535.         if(curtime - user->last > 60)
  536.         {
  537.             if(user->stage == ESTABLISH)
  538.             {
  539.                 user->status = REQUEST_FAILED;
  540.                 user->stage = RESPONSE;
  541.             }
  542.             else user->stage = DISCONNECT;
  543.         }
  544.     }
  545. }
  546.  
  547. void proxy_main_loop()
  548. {
  549.     socklen_t fromlen;
  550.     conn_t conn;
  551.    
  552.     conn.sendbufsize = 0;
  553.     while(1)
  554.     {
  555.         fromlen = sizeof(struct sockaddr_in);
  556.         conn.s = accept(server.sk.s,(struct sockaddr*)&conn.addr,&fromlen);
  557.         printf_d("accept connection %s\n",inet_ntoa(conn.addr.sin_addr));
  558.         proxy_accept_client(&conn);
  559.     }
  560. }
  561.  
  562. int main()
  563. {
  564.     signal(SIGPIPE,SIG_IGN);
  565.    
  566.     init_proxy_server(PROXY_PORT);
  567.     proxy_main_loop();
  568.     return 0;
  569. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement