Advertisement
Guest User

nonblocking multithreaded

a guest
Aug 20th, 2019
148
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.67 KB | None | 0 0
  1. #define _GNU_SOURCE
  2. #include <stdio.h>
  3. #include <errno.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #include <pthread.h>
  7. #include <stdatomic.h>
  8. #include <sys/types.h>
  9. #include <sys/epoll.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <netinet/tcp.h>
  13. #include <sys/sysinfo.h>
  14.  
  15. #define NUM_EVENTS 64
  16. #define READ_BUFFER (64 * 1024)
  17. #define RESPONSE "HTTP/1.1 200 Ok\r\nContent-Length: 11\r\n\r\nHello World"
  18.  
  19. int find_clrf(const char* buffer, int len) {
  20. const char* start_buffer = buffer;
  21. for (; len >= sizeof(int); len--, buffer++)
  22. if (*((int*)buffer) == *(int*)"\r\n\r\n")
  23. return buffer - start_buffer;
  24. return -1;
  25. }
  26.  
  27. int efd, server;
  28. int locks[1024] = { 0 };
  29.  
  30. void* worker(void* arg) {
  31. char buffer[READ_BUFFER];
  32. struct epoll_event events[NUM_EVENTS];
  33. while (1) {
  34. int n = epoll_wait(efd, events, NUM_EVENTS, -1);
  35. for (int i = 0; i < n; i++) {
  36. int* fd_lock = &locks[events[i].data.fd];
  37. int client, expect,
  38. fd = events[i].data.fd,
  39. error = events[i].events & (EPOLLHUP | EPOLLERR);
  40. if (error || atomic_exchange_explicit(fd_lock, 1, memory_order_acquire) != 0) {
  41. if (error) close(fd);
  42. continue;
  43. }
  44.  
  45. do {
  46. expect = 2;
  47. atomic_store_explicit(fd_lock, expect, memory_order_relaxed);
  48. if (fd == server) {
  49. while ((client = accept4(server, NULL, NULL, SOCK_NONBLOCK)) > 0) {
  50. int opt = 1;
  51. events[i].data.fd = client;
  52. events[i].events = EPOLLIN | EPOLLET;
  53. if (setsockopt(client, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) < 0) perror("no delay");
  54. if (epoll_ctl(efd, EPOLL_CTL_ADD, client, &events[i]) < 0) perror("epoll add client");
  55. }
  56. } else {
  57. int bytes, offset = 0;
  58. while ((bytes = read(fd, buffer + offset, READ_BUFFER - offset)) > 0) {
  59. int clrf_offset = 0;
  60. while ((clrf_offset = find_clrf(buffer + clrf_offset + offset, bytes)) != -1)
  61. if (send(fd, RESPONSE, sizeof(RESPONSE) - 1, MSG_NOSIGNAL) == -1)
  62. break;
  63. offset += bytes;
  64. }
  65. if (bytes == 0 || (bytes == -1 && errno != EAGAIN))
  66. close(fd);
  67. }
  68. } while (!atomic_compare_exchange_weak_explicit(fd_lock, &expect, 0, memory_order_release, memory_order_relaxed));
  69. }
  70. }
  71. }
  72.  
  73. int main() {
  74. int opt = 1;
  75. server = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
  76. if (server < 0) perror("socket create");
  77. if (setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) perror("reuse addr");
  78.  
  79. struct sockaddr_in addr;
  80. addr.sin_family = AF_INET;
  81. addr.sin_port = htons(12345);
  82. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  83. if (bind(server, (struct sockaddr*)&addr, sizeof(addr)) < 0) perror("bind");
  84. if (listen(server, SOMAXCONN) < 0) perror("listen");
  85.  
  86. efd = epoll_create1(0);
  87. if (efd < 0) perror("epoll");
  88. struct epoll_event event;
  89. event.events = EPOLLIN | EPOLLET;
  90. event.data.fd = server;
  91. if (epoll_ctl(efd, EPOLL_CTL_ADD, server, &event) < 0) perror("epoll add server");
  92.  
  93. pthread_t thread;
  94. for (int i = get_nprocs() - 1; i > 0; i--)
  95. if (pthread_create(&thread, NULL, worker, NULL) != 0)
  96. perror("create thread");
  97. worker(NULL);
  98. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement