Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdlib.h>
- #include <string.h>
- #include <stdbool.h>
- #include <mysql/mysql.h>
- #include <event.h>
- static const char *CONFIG_HOST = "localhost";
- static const char *CONFIG_USER = "root";
- static const char *CONFIG_PASSWORD = "***";
- static const char *CONFIG_DATABASE = "***";
- static const int CONFIG_NUM_CONNECTIONS = 10;
- enum mysql_connection_state {
- CONNECT_START,
- CONNECT_WAITING,
- CONNECT_DONE,
- QUERY_START,
- QUERY_WAITING,
- QUERY_RESULT_READY,
- FETCH_ROW_START,
- FETCH_ROW_WAITING,
- FETCH_ROW_RESULT_READY,
- CLOSE_START,
- CLOSE_WAITING,
- CLOSE_DONE
- };
- struct mysql_connection {
- enum mysql_connection_state state;
- MYSQL mysql;
- MYSQL *ret;
- MYSQL_RES *result;
- MYSQL_ROW row;
- struct event mysql_event;
- struct query_entry *current_query_entry;
- int err;
- int index;
- };
- struct query_entry {
- struct query_entry *next;
- char *query;
- int index;
- };
- static struct query_entry *g_query_list;
- static struct query_entry **g_tail_ptr = &g_query_list;
- static int g_active_connection_num = 0;
- static int g_query_counter = 0;
- static void connection_state_loop(int fd, short event, void *data);
- static void fatal(struct mysql_connection *conn, const char *msg) {
- fprintf(stderr, "%s: %s\n", msg, (conn ? mysql_error(&conn->mysql) : ""));
- exit(1);
- }
- static inline int add_event(int status, struct mysql_connection *conn) {
- short wait_event= 0;
- if (status & MYSQL_WAIT_READ) {
- wait_event|= EV_READ;
- }
- if (status & MYSQL_WAIT_WRITE) {
- wait_event|= EV_WRITE;
- }
- int fd = -1;
- if (wait_event) {
- fd = mysql_get_socket(&conn->mysql);
- }
- struct timeval tv, *ptv = NULL;
- if (status & MYSQL_WAIT_TIMEOUT) {
- tv.tv_sec = mysql_get_timeout_value(&conn->mysql);
- tv.tv_usec = 0;
- ptv = &tv;
- }
- event_set(&conn->mysql_event, fd, wait_event, connection_state_loop, conn);
- event_add(&conn->mysql_event, ptv);
- return 0;
- }
- static int mysql_status(short event) {
- int status= 0;
- if (event & EV_READ) {
- status |= MYSQL_WAIT_READ;
- }
- if (event & EV_WRITE) {
- status |= MYSQL_WAIT_WRITE;
- }
- if (event & EV_TIMEOUT) {
- status |= MYSQL_WAIT_TIMEOUT;
- }
- return status;
- }
- static int connection_decide_next_state(int need_wait, struct mysql_connection *connection, int state_wait, int state_go_on) {
- if (need_wait) {
- connection->state = state_wait;
- add_event(need_wait, connection);
- return 0;
- } else {
- connection->state = state_go_on;
- return 1;
- }
- }
- static bool connection_state_run(struct mysql_connection *connection, short event) {
- bool running = true;
- switch (connection->state) {
- int status;
- case CONNECT_START:
- status = mysql_real_connect_start(&connection->ret, &connection->mysql, CONFIG_HOST, CONFIG_USER, CONFIG_PASSWORD, CONFIG_DATABASE, 0, NULL, 0);
- running = connection_decide_next_state(status, connection, CONNECT_WAITING, CONNECT_DONE);
- break;
- case CONNECT_WAITING:
- status = mysql_real_connect_cont(&connection->ret, &connection->mysql, mysql_status(event));
- running = connection_decide_next_state(status, connection, CONNECT_WAITING, CONNECT_DONE);
- break;
- case CONNECT_DONE:
- if (!connection->ret) {
- fatal(connection, "Failed to mysql_real_connect()");
- }
- connection->state = QUERY_START;
- break;
- case QUERY_START:
- connection->current_query_entry = g_query_list;
- if (!connection->current_query_entry) {
- connection->state = CLOSE_START;
- break;
- }
- g_query_list = g_query_list->next;
- status= mysql_real_query_start(&connection->err, &connection->mysql, connection->current_query_entry->query, strlen(connection->current_query_entry->query));
- running = connection_decide_next_state(status, connection, QUERY_WAITING, QUERY_RESULT_READY);
- break;
- case QUERY_WAITING:
- status = mysql_real_query_cont(&connection->err, &connection->mysql, mysql_status(event));
- running = connection_decide_next_state(status, connection, QUERY_WAITING, QUERY_RESULT_READY);
- break;
- case QUERY_RESULT_READY:
- if (connection->err) {
- printf("%d | Error: %s\n", connection->index, mysql_error(&connection->mysql));
- connection->state = CONNECT_DONE;
- break;
- }
- free(connection->current_query_entry->query);
- free(connection->current_query_entry);
- connection->result = mysql_use_result(&connection->mysql);
- connection->state = (connection->result) ? FETCH_ROW_START : QUERY_START;
- break;
- case FETCH_ROW_START:
- status = mysql_fetch_row_start(&connection->row, connection->result);
- running = connection_decide_next_state(status, connection, FETCH_ROW_WAITING, FETCH_ROW_RESULT_READY);
- break;
- case FETCH_ROW_WAITING:
- status= mysql_fetch_row_cont(&connection->row, connection->result, mysql_status(event));
- running = connection_decide_next_state(status, connection, FETCH_ROW_WAITING, FETCH_ROW_RESULT_READY);
- break;
- case FETCH_ROW_RESULT_READY:
- if (!connection->row) {
- if (mysql_errno(&connection->mysql)) {
- printf("%d | Error: %s\n", connection->index, mysql_error(&connection->mysql));
- } else {
- // EOF. No more rows
- }
- mysql_free_result(connection->result);
- connection->state = QUERY_START;
- break;
- }
- /*
- // print fields in the row
- printf("%d %d", connection->index, connection->current_query_entry->index);
- for (int i= 0; i < mysql_num_fields(connection->result); i++) {
- printf("%s%s", (i ? "\t" : ""), (connection->row[i] ? connection->row[i] : "(null)"));
- }
- printf("\n");
- */
- connection->state = FETCH_ROW_START;
- break;
- case CLOSE_START:
- status = mysql_close_start(&connection->mysql);
- running = connection_decide_next_state(status, connection, CLOSE_WAITING, CLOSE_DONE);
- break;
- case CLOSE_WAITING:
- status= mysql_close_cont(&connection->mysql, mysql_status(event));
- running = connection_decide_next_state(status, connection, CLOSE_WAITING, CLOSE_DONE);
- break;
- case CLOSE_DONE:
- g_active_connection_num--;
- if (g_active_connection_num == 0) {
- event_loopbreak();
- }
- running = false;
- break;
- default:
- abort();
- break;
- }
- return running;
- }
- static void connection_state_loop(int fd , short event, void *data) {
- while (connection_state_run((struct mysql_connection *)data, event));
- }
- void add_query(const char *query) {
- struct query_entry *entry = (struct query_entry *)malloc(sizeof(*entry));
- if (!entry) {
- fatal(NULL, "Out of memory");
- }
- entry->query = (char *)malloc(strlen(query) + 1);
- if (!entry->query) {
- fatal(NULL, "Out of memory");
- }
- strcpy(entry->query, query);
- entry->next = NULL;
- entry->index = g_query_counter++;
- *g_tail_ptr = entry;
- g_tail_ptr = &entry->next;
- }
- int main(int argc, char *argv[]) {
- struct mysql_connection *connections = (struct mysql_connection *)calloc(CONFIG_NUM_CONNECTIONS, sizeof(*connections));
- if (!connections) {
- fatal(NULL, "Out of memory");
- }
- struct event_base *libevent_base = event_init();
- for (int i = 0; i < CONFIG_NUM_CONNECTIONS; i++) {
- mysql_init(&connections[i].mysql);
- mysql_options(&connections[i].mysql, MYSQL_OPT_NONBLOCK, NULL);
- //mysql_options(&connections[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries");
- connections[i].state = CONNECT_START;
- connections[i].index = i;
- g_active_connection_num++;
- connection_state_loop(-1, -1, &connections[i]);
- }
- for (int i = 0; i < 10000; i++) {
- add_query("SELECT * FROM bans;");
- }
- event_dispatch();
- free(connections);
- event_base_free(libevent_base);
- return 0;
- }
Add Comment
Please, Sign In to add comment