Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stddef.h>
- #include <stdint.h>
- #include <stdbool.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <uv.h>
- #include <netinet/in.h>
- #include <ck_ring.h>
- #include <parson.h>
- #define RUNTIME 100000
- static uv_loop_t* loop;
- static size_t nthreads;
- static uv_thread_t* threads;
- static ck_ring_t* rings;
- static ck_ring_buffer_t** ring_buffers;
- #define UV_FATAL(expr) do { \
- if((libuv_rc = (expr))) { \
- fprintf(stderr, "libuv error: %s => %s\n", #expr, uv_strerror(libuv_rc)); \
- exit(1); \
- }\
- } while(0)
- #define UV_NONFATAL(expr) do { \
- if((libuv_rc = (expr))) { \
- fprintf(stderr, "libuv error: %s => %s\n", #expr, uv_strerror(libuv_rc)); \
- return; \
- }\
- } while(0)
- typedef struct {
- uv_write_t req;
- uv_stream_t* client;
- uv_buf_t buf;
- } write_req_t;
- typedef write_req_t ringbuffer_elem_t;
- static void usage(char* exe) {
- printf("Usage: %s PORT\n", exe);
- }
- static void alloc_buffer(uv_handle_t* client, size_t buflen, uv_buf_t* buf) {
- buf->base = malloc(buflen);
- buf->len = buflen;
- }
- static void on_write(uv_write_t *req, int status) {
- printf("on write called\n");
- if(status < 0) {
- fprintf(stderr, "write error %s\n", uv_strerror(status));
- }
- write_req_t* writereq = (write_req_t*)req;
- uv_close((uv_handle_t*)writereq->client, NULL);
- printf("closed client connection\n");
- free(writereq->buf.base);
- free(writereq);
- }
- static void on_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
- if(nread == UV_EOF) {
- puts("Was supposed to close here");
- } else if (nread > 0) {
- printf("read %s", buf->base);
- if(buf->base[nread-1] != '\n') {
- printf("\n");
- }
- JSON_Value* val = json_parse_string(buf->base);
- if(val==NULL) {
- puts("JSON parse error\n");
- return;
- }
- JSON_Object* objval = json_value_get_object(val);
- if(objval==NULL) {
- puts("JSON parse error\n");
- return;
- }
- int threadnum = json_object_get_number(objval, "id");
- char* json_msg = strdup(json_object_get_string(objval, "msg"));
- size_t buflen = strlen(json_msg);
- ringbuffer_elem_t* elem = (ringbuffer_elem_t*)malloc(sizeof(ringbuffer_elem_t));
- /* leave elem->req uninitialized for now */
- elem->client = tcp;
- elem->buf = uv_buf_init(json_msg, buflen);;
- ck_ring_enqueue_spsc(&rings[threadnum], ring_buffers[threadnum], (void*)elem);
- json_value_free(val);
- }
- if(nread == 0) {
- free(buf->base);
- }
- }
- static void on_connect(uv_stream_t *server, int status) {
- if(status < 0) {
- fprintf(stderr, "New connection error %s\n", uv_strerror(status));
- return;
- }
- int libuv_rc;
- uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
- UV_NONFATAL(uv_tcp_init(loop, client));
- UV_NONFATAL(uv_accept(server, (uv_stream_t*) client));
- puts("Yay new connection");
- UV_NONFATAL(uv_read_start((uv_stream_t*)client, alloc_buffer, on_read));
- }
- static void on_timeout(uv_timer_t* handle) {
- puts("Timing out!");
- uv_stop(loop);
- }
- static void setup_tcp_server(uint16_t port) {
- int libuv_rc;
- uv_tcp_t* server = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
- UV_FATAL(uv_tcp_init(loop, server));
- struct sockaddr_in addr;
- uv_ip4_addr("0.0.0.0", port, &addr);
- UV_FATAL(uv_tcp_bind(server, (const struct sockaddr*)&addr, 0));
- UV_FATAL(uv_listen((uv_stream_t*)server, 128, on_connect));
- }
- static void setup_timer(uint64_t timeout) {
- int libuv_rc;
- uv_timer_t* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
- UV_FATAL(uv_timer_init(loop, timer));
- UV_FATAL(uv_timer_start(timer, on_timeout, timeout, 0));
- }
- void thread_cb(void* arg) {
- int thread_id = (int)arg;
- ck_ring_t* ring = &rings[thread_id];
- ck_ring_buffer_t* ringbuffer = ring_buffers[thread_id];
- printf("Starting thread %d\n", thread_id);
- for(;;) {
- void* msg;
- if(ck_ring_dequeue_spsc(ring, ringbuffer, &msg)) {
- ringbuffer_elem_t* elem = (ringbuffer_elem_t*)msg;
- if(!strcmp(elem->buf.base, "exit")) {
- printf("Thread %d exiting\n", thread_id);
- break;
- } else {
- printf("Thread %d got msg %s\n", thread_id, elem->buf.base);
- printf("Echoing\n");
- write_req_t* writereq = (write_req_t*)elem;
- uv_write(&writereq->req, elem->client, &writereq->buf, 1, on_write);
- }
- }
- }
- }
- static void start_threads(size_t n) {
- nthreads = n;
- threads = (uv_thread_t*)malloc(n*sizeof(uv_thread_t));
- for(size_t i=0; i<n; i++) {
- uv_thread_create(&threads[i], thread_cb, (void*)i);
- }
- }
- static void stop_threads() {
- ringbuffer_elem_t* elem = (ringbuffer_elem_t*)malloc(sizeof(ringbuffer_elem_t));
- elem->client = NULL;
- elem->buf = uv_buf_init(strdup("exit"), 4);
- for(size_t i=0; i<nthreads; i++) {
- ck_ring_enqueue_spsc(&rings[i], ring_buffers[i], elem);
- uv_thread_join(&threads[i]);
- }
- }
- static void setup_rings(size_t num_rings, size_t buflen) {
- rings = (ck_ring_t*)malloc(num_rings*sizeof(ck_ring_t));
- ring_buffers = (ck_ring_buffer_t**)malloc(num_rings*sizeof(ck_ring_buffer_t*));
- ck_ring_buffer_t* megabuf = (ck_ring_buffer_t*)malloc(num_rings*buflen*sizeof(void*));
- for(size_t i=0; i<num_rings; i++) {
- ck_ring_init(&rings[i], buflen);
- ring_buffers[i] = megabuf+i*buflen;
- }
- }
- int main(int argc, char** argv) {
- if(argc < 2) {
- usage(argv[0]);
- return 1;
- }
- int libuv_rc;
- loop = malloc(sizeof(uv_loop_t));
- uint16_t port = atoi(argv[1]);
- UV_FATAL(uv_loop_init(loop));
- setup_tcp_server(port);
- setup_timer(RUNTIME);
- setup_rings(10, 1024);
- start_threads(10);
- puts("Starting event loop...");
- uv_run(loop, UV_RUN_DEFAULT);
- puts("Quitting...");
- uv_loop_close(loop);
- free(loop);
- stop_threads();
- return 0;
- }
Add Comment
Please, Sign In to add comment