Guest User

Untitled

a guest
Jul 21st, 2018
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.93 KB | None | 0 0
  1. #include <stddef.h>
  2. #include <stdint.h>
  3. #include <stdbool.h>
  4.  
  5. #include <stdlib.h>
  6. #include <stdio.h>
  7. #include <uv.h>
  8.  
  9. #include <netinet/in.h>
  10.  
  11. #include <ck_ring.h>
  12.  
  13. #include <parson.h>
  14.  
  15. #define RUNTIME 100000
  16.  
  17. static uv_loop_t* loop;
  18.  
  19. static size_t nthreads;
  20. static uv_thread_t* threads;
  21.  
  22. static ck_ring_t* rings;
  23. static ck_ring_buffer_t** ring_buffers;
  24.  
  25. #define UV_FATAL(expr) do { \
  26. if((libuv_rc = (expr))) { \
  27. fprintf(stderr, "libuv error: %s => %s\n", #expr, uv_strerror(libuv_rc)); \
  28. exit(1); \
  29. }\
  30. } while(0)
  31.  
  32. #define UV_NONFATAL(expr) do { \
  33. if((libuv_rc = (expr))) { \
  34. fprintf(stderr, "libuv error: %s => %s\n", #expr, uv_strerror(libuv_rc)); \
  35. return; \
  36. }\
  37. } while(0)
  38.  
  39. typedef struct {
  40. uv_write_t req;
  41. uv_stream_t* client;
  42. uv_buf_t buf;
  43. } write_req_t;
  44.  
  45. typedef write_req_t ringbuffer_elem_t;
  46.  
  47. static void usage(char* exe) {
  48. printf("Usage: %s PORT\n", exe);
  49. }
  50.  
  51. static void alloc_buffer(uv_handle_t* client, size_t buflen, uv_buf_t* buf) {
  52. buf->base = malloc(buflen);
  53. buf->len = buflen;
  54. }
  55.  
  56. static void on_write(uv_write_t *req, int status) {
  57. printf("on write called\n");
  58. if(status < 0) {
  59. fprintf(stderr, "write error %s\n", uv_strerror(status));
  60. }
  61.  
  62. write_req_t* writereq = (write_req_t*)req;
  63.  
  64. uv_close((uv_handle_t*)writereq->client, NULL);
  65. printf("closed client connection\n");
  66.  
  67. free(writereq->buf.base);
  68. free(writereq);
  69. }
  70.  
  71. static void on_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
  72. if(nread == UV_EOF) {
  73. puts("Was supposed to close here");
  74. } else if (nread > 0) {
  75. printf("read %s", buf->base);
  76. if(buf->base[nread-1] != '\n') {
  77. printf("\n");
  78. }
  79.  
  80. JSON_Value* val = json_parse_string(buf->base);
  81. if(val==NULL) {
  82. puts("JSON parse error\n");
  83. return;
  84. }
  85.  
  86. JSON_Object* objval = json_value_get_object(val);
  87. if(objval==NULL) {
  88. puts("JSON parse error\n");
  89. return;
  90. }
  91.  
  92. int threadnum = json_object_get_number(objval, "id");
  93. char* json_msg = strdup(json_object_get_string(objval, "msg"));
  94. size_t buflen = strlen(json_msg);
  95.  
  96. ringbuffer_elem_t* elem = (ringbuffer_elem_t*)malloc(sizeof(ringbuffer_elem_t));
  97. /* leave elem->req uninitialized for now */
  98. elem->client = tcp;
  99. elem->buf = uv_buf_init(json_msg, buflen);;
  100.  
  101. ck_ring_enqueue_spsc(&rings[threadnum], ring_buffers[threadnum], (void*)elem);
  102.  
  103. json_value_free(val);
  104. }
  105.  
  106. if(nread == 0) {
  107. free(buf->base);
  108. }
  109. }
  110.  
  111. static void on_connect(uv_stream_t *server, int status) {
  112. if(status < 0) {
  113. fprintf(stderr, "New connection error %s\n", uv_strerror(status));
  114. return;
  115. }
  116.  
  117. int libuv_rc;
  118.  
  119. uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  120. UV_NONFATAL(uv_tcp_init(loop, client));
  121.  
  122. UV_NONFATAL(uv_accept(server, (uv_stream_t*) client));
  123.  
  124. puts("Yay new connection");
  125. UV_NONFATAL(uv_read_start((uv_stream_t*)client, alloc_buffer, on_read));
  126. }
  127.  
  128. static void on_timeout(uv_timer_t* handle) {
  129. puts("Timing out!");
  130. uv_stop(loop);
  131. }
  132.  
  133. static void setup_tcp_server(uint16_t port) {
  134. int libuv_rc;
  135.  
  136. uv_tcp_t* server = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
  137. UV_FATAL(uv_tcp_init(loop, server));
  138.  
  139. struct sockaddr_in addr;
  140. uv_ip4_addr("0.0.0.0", port, &addr);
  141.  
  142. UV_FATAL(uv_tcp_bind(server, (const struct sockaddr*)&addr, 0));
  143. UV_FATAL(uv_listen((uv_stream_t*)server, 128, on_connect));
  144. }
  145.  
  146. static void setup_timer(uint64_t timeout) {
  147. int libuv_rc;
  148.  
  149. uv_timer_t* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
  150. UV_FATAL(uv_timer_init(loop, timer));
  151.  
  152. UV_FATAL(uv_timer_start(timer, on_timeout, timeout, 0));
  153. }
  154.  
  155. void thread_cb(void* arg) {
  156. int thread_id = (int)arg;
  157.  
  158. ck_ring_t* ring = &rings[thread_id];
  159. ck_ring_buffer_t* ringbuffer = ring_buffers[thread_id];
  160.  
  161. printf("Starting thread %d\n", thread_id);
  162.  
  163. for(;;) {
  164. void* msg;
  165.  
  166. if(ck_ring_dequeue_spsc(ring, ringbuffer, &msg)) {
  167. ringbuffer_elem_t* elem = (ringbuffer_elem_t*)msg;
  168.  
  169. if(!strcmp(elem->buf.base, "exit")) {
  170. printf("Thread %d exiting\n", thread_id);
  171. break;
  172. } else {
  173. printf("Thread %d got msg %s\n", thread_id, elem->buf.base);
  174. printf("Echoing\n");
  175.  
  176. write_req_t* writereq = (write_req_t*)elem;
  177.  
  178. uv_write(&writereq->req, elem->client, &writereq->buf, 1, on_write);
  179. }
  180. }
  181. }
  182. }
  183.  
  184. static void start_threads(size_t n) {
  185. nthreads = n;
  186. threads = (uv_thread_t*)malloc(n*sizeof(uv_thread_t));
  187.  
  188. for(size_t i=0; i<n; i++) {
  189. uv_thread_create(&threads[i], thread_cb, (void*)i);
  190. }
  191. }
  192.  
  193. static void stop_threads() {
  194. ringbuffer_elem_t* elem = (ringbuffer_elem_t*)malloc(sizeof(ringbuffer_elem_t));
  195. elem->client = NULL;
  196. elem->buf = uv_buf_init(strdup("exit"), 4);
  197.  
  198. for(size_t i=0; i<nthreads; i++) {
  199. ck_ring_enqueue_spsc(&rings[i], ring_buffers[i], elem);
  200. uv_thread_join(&threads[i]);
  201. }
  202. }
  203.  
  204. static void setup_rings(size_t num_rings, size_t buflen) {
  205. rings = (ck_ring_t*)malloc(num_rings*sizeof(ck_ring_t));
  206. ring_buffers = (ck_ring_buffer_t**)malloc(num_rings*sizeof(ck_ring_buffer_t*));
  207. ck_ring_buffer_t* megabuf = (ck_ring_buffer_t*)malloc(num_rings*buflen*sizeof(void*));
  208. for(size_t i=0; i<num_rings; i++) {
  209. ck_ring_init(&rings[i], buflen);
  210. ring_buffers[i] = megabuf+i*buflen;
  211. }
  212. }
  213.  
  214. int main(int argc, char** argv) {
  215. if(argc < 2) {
  216. usage(argv[0]);
  217. return 1;
  218. }
  219.  
  220. int libuv_rc;
  221.  
  222. loop = malloc(sizeof(uv_loop_t));
  223.  
  224. uint16_t port = atoi(argv[1]);
  225.  
  226. UV_FATAL(uv_loop_init(loop));
  227.  
  228. setup_tcp_server(port);
  229. setup_timer(RUNTIME);
  230.  
  231. setup_rings(10, 1024);
  232. start_threads(10);
  233.  
  234. puts("Starting event loop...");
  235.  
  236. uv_run(loop, UV_RUN_DEFAULT);
  237.  
  238. puts("Quitting...");
  239.  
  240. uv_loop_close(loop);
  241. free(loop);
  242.  
  243. stop_threads();
  244.  
  245. return 0;
  246. }
Add Comment
Please, Sign In to add comment