Advertisement
Guest User

multithreaded c++ sockets

a guest
Nov 16th, 2024
42
0
157 days
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 9.63 KB | Source Code | 0 0
  1. #include <iostream>
  2. #include <thread>
  3. #include <semaphore>
  4. #include <vector>
  5. #include <sys/epoll.h>
  6. #include <sys/socket.h>
  7. #include <cstring>
  8. #include <unistd.h>
  9. //#include <fcntl.h>
  10. #include <netinet/in.h>
  11. #include <arpa/inet.h>
  12. #include <time.h>
  13.  
  14. //extern std::string getResponse(std::string request);
  15. std::string getResponse(const std::string& request) {
  16.   time_t timenow;
  17.   timenow = time(NULL);
  18.   struct tm *tt;
  19.   tt = gmtime(&timenow);
  20.   std::string response = "HTTP/1.1 200 OK\r\nServer: nginx/1.22.1\r\n";
  21.   char buf[50];
  22.   strftime(buf, sizeof(buf), "%a, %d %b %Y %T %Z", tt);
  23.   response.append("Date: " + std::string(buf) + "\r\n");
  24.   response.append("Content-Type: text/html\r\nContent-Length: 615\r\nLast-Modified: Sat, 09 Nov 2024 22:21:58 GMT\r\nConnection: close\r\nETag: \"672fe086-267\"\r\nAccept-Ranges: bytes\r\n\r\n<!DOCTYPE html>\n<html>\n<head>\n<title>Welcome to nginx!</title>\n<style>\nhtml { color-scheme: light dark; }\nbody { width: 35em; margin: 0 auto;\nfont-family: Tahoma, Verdana, Arial, sans-serif; }\n</style>\n</head>\n<body>\n<h1>Welcome to nginx!</h1>\n<p>If you see this page, the nginx web server is successfully installed and\nworking. Further configuration is required.</p>\n\n<p>For online documentation and support please refer to\n<a href=\"http://nginx.org/\">nginx.org</a>.<br/>\nCommercial support is available at\n<a href=\"http://nginx.com/\">nginx.com</a>.</p>\n\n<p><em>Thank you for using nginx.</em></p>\n</body>\n</html>\n");
  25.   //printf("%s\n", response.c_str());
  26.   return response;
  27. }
  28.  
  29. // Rewriting to give each child its own epollfd to epoll_wait(...) on!
  30. // This is supposed to be what Nginx does OwO
  31.  
  32. // Bugs:
  33. // * Child process sometimes (kind of rarely) somehow iterates the outer loop one time too many, so must be something wrong with the lock counting????? ...
  34. // as in we're not locking enough times or some kind of bad logic here?
  35. // *
  36.  
  37. const uint_fast8_t verbosity = 0;
  38. const uint_fast16_t nthreads = 2;
  39. #define MAX_EVENTS 10000
  40.  
  41. void fatalError(std::string message) {
  42.   printf("Fatal error: %s\n", message.c_str());
  43.   exit(EXIT_FAILURE);
  44. }
  45.  
  46. std::vector<std::thread> threads;
  47. struct epoll_event events[nthreads][MAX_EVENTS];
  48. int listen_sock, epollfd[nthreads];
  49. uint_fast16_t lastpid = 0;
  50. //struct sockaddr_in addr;
  51. struct sockaddr_in6 addr6;
  52. int addrlen = sizeof(addr6);
  53.  
  54. /*
  55. double microtime() {
  56.   return (double(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count()) / double(1000000));
  57. }
  58.  
  59. // Unused because we need string formatting for this
  60. void log(std::string message) {
  61.   double timenow = microtime();
  62.   printf("[%f] %s", timenow, message.c_str());
  63. }
  64. */
  65.  
  66. void acceptNewConnections() {
  67.   // Accept and register new sockets while child processes are processing established sockets
  68.   struct epoll_event ev;
  69.   ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR;
  70.   int conn_sock;
  71.   while (true) {
  72.     conn_sock = 0;
  73.     while (conn_sock != -1) {
  74.       if (verbosity >= 3)
  75.         printf("main: accept4()\n");
  76.       // Handle (accept) new connection sock on listen_sock
  77.       conn_sock = accept4(listen_sock, (struct sockaddr*)&addr6, (socklen_t*)&addrlen, SOCK_NONBLOCK); // SOCK_NONBLOCK last param
  78.       if (conn_sock >= 0) {
  79.         if (verbosity >= 2)
  80.           printf("Main: New connection: %d\n", conn_sock);
  81.         ev.data.fd = conn_sock;
  82.         // Calculate epollfd index based on last pid used
  83.         if (++lastpid == nthreads)
  84.           lastpid = 1;
  85.         // Add new connection to epoll
  86.         if (epoll_ctl(epollfd[lastpid], EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
  87.           perror("[perror] main: epoll_ctl: conn_sock");
  88.           close(conn_sock);
  89.         }
  90.       }
  91.     }
  92.   }
  93. }
  94.  
  95. /* DEF ALL THE MAIN OPERATIONS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */
  96.  
  97. // new mainLoop just listens on listen_sock and delegates new connections to the different threads, each with their own
  98. // epoll device. listen_sock is now blockable
  99. void mainLoop() {
  100.   int nfds;
  101.   printf("main: epollfd: %d, listen_sock %d\n", epollfd[0], listen_sock);
  102.   while (true) {
  103.     if (verbosity >= 1)
  104.       printf("main: epoll_wait\n");
  105.     nfds = epoll_wait(epollfd[0], events[0], MAX_EVENTS, -1); // currently NO_BLOCK set on listen_sock
  106.     if (nfds == -1) {
  107.       perror("[perror] main: epoll_wait");
  108.       close(listen_sock);
  109.       close(epollfd[0]);
  110.       exit(EXIT_FAILURE);
  111.     }
  112.     //else if (nfds == 0 || (nfds == 1 && events[0][0].data.fd == listen_sock)) {
  113.     //  acceptNewConnections();
  114.       //printf("Main: we're about to continue; nfds: %d\n", nfds);
  115.     //  continue;
  116.     //}
  117.    
  118.     //unlockChildren();
  119.  
  120.     acceptNewConnections();
  121.  
  122.     // Wait for child processes to finish
  123.     //for(uint_fast16_t pid = 1; pid < nthreads; pid++)
  124.     //  //lockUp.lock("main", 0);
  125.     //  //locks[pid].lock();
  126.     //  //lockUp.lock("main", 0);
  127.     //  locks[0].lock();
  128.   }
  129. }
  130.  
  131. void doTask(unsigned int pid) {
  132.   struct epoll_event ev;
  133.   int conn_sock, nfds;
  134.   while (true) {
  135.     //printf("Child: new for loop for pid %d\n", pid);
  136.     nfds = epoll_wait(epollfd[pid], events[pid], MAX_EVENTS, -1);
  137.     for (uint_fast64_t n = 0; n < nfds; n++) {
  138.       struct epoll_event* event = &events[pid][n];
  139.       if (verbosity >= 2)
  140.         printf("child process: Pid %d: fd %d. nfds %d, events %d\n", pid, event->data.fd, nfds, event->events);
  141.       if (event->events & EPOLLHUP || event->events & EPOLLERR) {
  142.         if (verbosity >= 1)
  143.           printf("child process (%d): event[%d] (nfds %d), fd %d: EHOLLUP/EPOLLERR %d (epollfd %d, listen_sock %d)\n", pid, n, nfds, event->data.fd, event->events, epollfd, listen_sock);
  144.         //ev.events = 0;
  145.         //ev.data.fd = event->data.fd;
  146.         if (!(event->events & EPOLLERR) && epoll_ctl(epollfd[pid], EPOLL_CTL_DEL, event->data.fd, nullptr)) {
  147.           perror("[perror] child process: epoll_ctl EPOLL_CTL_DEL");
  148.           printf("Perror caused by pid %d uwu\n", pid);
  149.         }
  150.         //shutdown(event->data.fd, SHUT_WR);
  151.         close(event->data.fd);
  152.         if (verbosity >= 3)
  153.           printf("child process: EPOLL_CTL_DEL -> shutdown -> close on fd %d\n", event->data.fd);
  154.       }
  155.       else {
  156.         if (event->events & EPOLLIN) { // EPOLLIN == 1
  157.           recv(event->data.fd, nullptr, 1024, 0);
  158.           //printf("%d: EPOLLIN\n", event->data.fd);
  159.         }
  160.         if (event->events & EPOLLOUT) { // EPOLLOUT == 4
  161.           std::string response = getResponse("");
  162.           send(event->data.fd, response.c_str(), response.size(), MSG_NOSIGNAL);
  163.           //printf("%d: EPOLLOUT\n", event->data.fd);
  164.           //close(event->data.fd);
  165.           //shutdown(event->data.fd, SHUT_WR);
  166.           ev.events = EPOLLHUP | EPOLLERR;
  167.           ev.data.fd = event->data.fd;
  168.           epoll_ctl(epollfd[pid], EPOLL_CTL_MOD, event->data.fd, &ev);
  169.         }
  170.         if (event->events & EPOLLET) {
  171.           printf("%d: EPOLLET\n", event->data.fd);
  172.         }
  173.       }
  174.     }
  175.   }
  176. }
  177.  
  178. void initThreads(unsigned int nthreadsValue) {
  179.   //nthreads = nthreadsValue;
  180.   threads.resize(nthreads);
  181.   std::thread t;
  182.   for (unsigned int pid=0; pid<nthreads; pid++) {
  183.     if (pid == 0)
  184.       threads[pid] = std::thread(acceptNewConnections);
  185.     else
  186.       threads[pid] = std::thread(doTask, pid);
  187.     printf("Spawned thread %d\n", pid);
  188.   }
  189.   // Main thread remains here atm, all other threads goes to mainLoop and doTask
  190.   for (std::thread& t : threads) {
  191.     //t.detach();
  192.     t.join();
  193.   }
  194. }
  195.  
  196. void setupEpoll(std::string host, unsigned int port) {
  197.   struct epoll_event ev;
  198.   // Create listening socket for IPv4 and IPv6
  199.   listen_sock = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
  200.   if (listen_sock == -1) {
  201.     perror("socket");
  202.     exit(EXIT_FAILURE);
  203.   }
  204.  
  205.   const int enable = 1;
  206.   setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
  207.   setsockopt(listen_sock, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(int));
  208.  
  209.   // Enable dual-stack (IPv4 and IPv6) operation
  210.   int disable_v6only = 0;
  211.   if (setsockopt(listen_sock, IPPROTO_IPV6, IPV6_V6ONLY, &disable_v6only, sizeof(disable_v6only)) == -1) {
  212.     perror("setsockopt IPV6_V6ONLY");
  213.     close(listen_sock);
  214.     exit(EXIT_FAILURE);
  215.   }
  216.  
  217.   memset(&addr6, 0, sizeof(addr6));
  218.   addr6.sin6_family = AF_INET6;
  219.   addr6.sin6_port = htons(port);
  220.   addr6.sin6_addr = in6addr_any;  // Allow connections to any IPv6 address
  221.  
  222.   // Bind the socket
  223.   if (bind(listen_sock, (struct sockaddr*)&addr6, sizeof(addr6)) == -1) {
  224.     perror("bind");
  225.     close(listen_sock);
  226.     exit(EXIT_FAILURE);
  227.   }
  228.  
  229.   // Start listening
  230.   if (listen(listen_sock, SOMAXCONN) == -1) {
  231.     perror("listen");
  232.     close(listen_sock);
  233.     exit(EXIT_FAILURE);
  234.   }
  235.  
  236.   for(uint16_t pid = 0; pid<nthreads; pid++) {
  237.     // Create epoll instance
  238.     epollfd[pid] = epoll_create1(0);
  239.     if (epollfd[pid] == -1) {
  240.       perror("epoll_create1");
  241.       close(listen_sock);
  242.       exit(EXIT_FAILURE);
  243.     }
  244.   }
  245.  
  246.   // Add listening socket to epollfd[0]
  247.   ev.events = EPOLLIN | EPOLLHUP | EPOLLERR;
  248.   ev.data.fd = listen_sock;
  249.   if (epoll_ctl(epollfd[0], EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
  250.     perror("epoll_ctl: listen_sock");
  251.     close(listen_sock);
  252.     close(epollfd[0]);
  253.     exit(EXIT_FAILURE);
  254.   }
  255. }
  256.  
  257. // Will later be used by Cython instead of main
  258. void setup(std::string host, unsigned int port) {
  259.   if (nthreads < 2) {
  260.     printf("nthreads must be set to >= 2\n");
  261.     exit(1);
  262.   }
  263.   printf("Running program with %d threads\n", nthreads);
  264.   setupEpoll(host, port);
  265.   initThreads(nthreads);
  266. }
  267.  
  268. int main() {
  269.   setup("::", 8080);  // Use "::" to bind to all IPv6 addresses
  270.   return 0;
  271. }
  272.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement