Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/src/contrib/libpdel/util/pevent.c b/src/contrib/libpdel/util/pevent.c
- index 6cb2710..6fd382d 100644
- --- a/src/contrib/libpdel/util/pevent.c
- +++ b/src/contrib/libpdel/util/pevent.c
- @@ -39,6 +39,7 @@
- */
- #include <sys/types.h>
- +#include <sys/event.h>
- #include <sys/param.h>
- #include <sys/queue.h>
- #include <sys/time.h>
- @@ -56,6 +57,7 @@
- #include <sched.h>
- #include <pthread.h>
- +
- #include "structs/structs.h"
- #include "structs/type/array.h"
- #include "util/typed_mem.h"
- @@ -106,6 +108,9 @@ struct pevent_ctx {
- u_char notified; /* data in the pipe */
- u_char has_attr; /* 'attr' is valid */
- u_int refs; /* references to this context */
- +
- + int kq; /* kernel event queue fd */
- + struct kevent *klist; /* kqueue(2) array */
- };
- /* Event object */
- @@ -166,6 +171,7 @@ struct pevent {
- TAILQ_REMOVE(&(ctx)->events, (ev), next); \
- TAILQ_INSERT_HEAD(&(ctx)->events, (ev), next); \
- } \
- + DBG(PEVENT, "ev %p (occured)", (ev)); \
- } while (0)
- /* Internal functions */
- @@ -538,13 +544,15 @@ static void *
- pevent_ctx_main(void *arg)
- {
- struct pevent_ctx *const ctx = arg;
- + struct kevent *kevents = NULL;
- struct timeval now;
- - struct pollfd *fd;
- struct pevent *ev;
- struct pevent *next_ev;
- int poll_idx;
- int timeout;
- - int r;
- + struct timespec ktimeout;
- + struct timespec *pktimeout;
- + int nev;
- /* Push cleanup hook */
- pthread_cleanup_push(pevent_ctx_main_cleanup, ctx);
- @@ -560,6 +568,13 @@ pevent_ctx_main(void *arg)
- gettimeofday(&now, NULL);
- DBG(PEVENT, "ctx %p thread starting", ctx);
- + /* XXX */
- + ctx->kq = kqueue();
- + if (ctx->kq == -1) {
- + DBG(PEVENT, "ctx %p can't create kqueue", ctx);
- + goto done;
- + }
- +
- loop:
- /* Are there any events left? */
- if (TAILQ_EMPTY(&ctx->events)) {
- @@ -567,18 +582,25 @@ loop:
- goto done;
- }
- - /* Make sure ctx->fds array is long enough */
- + /* Make sure ctx->klist and kevents arrays are long enough */
- if (ctx->fds_alloc < 1 + ctx->nrwevents) {
- const u_int new_alloc = roundup(1 + ctx->nrwevents, 16);
- void *mem;
- - if ((mem = REALLOC(ctx->mtype, ctx->fds,
- - new_alloc * sizeof(*ctx->fds))) == NULL)
- - alogf(LOG_ERR, "%s: %m", "realloc");
- + if ((mem = REALLOC(ctx->mtype, ctx->klist,
- + new_alloc * sizeof(*ctx->klist))) == NULL)
- + alogf(LOG_ERR, "%s: %m", "realloc ctx->klist");
- else {
- - ctx->fds = mem;
- + ctx->klist = mem;
- ctx->fds_alloc = new_alloc;
- }
- +
- + if ((mem = REALLOC(ctx->mtype, kevents,
- + new_alloc * sizeof(*kevents))) == NULL)
- + alogf(LOG_ERR, "%s: %m", "realloc kevents");
- + else {
- + kevents = mem;
- + }
- }
- /* If we were intentionally woken up, read the wakeup byte */
- @@ -588,16 +610,15 @@ loop:
- ctx->notified = 0;
- }
- - /* Add event for the notify pipe */
- + /* XXX Add event for notify pipe into kqueue */
- poll_idx = 0;
- if (ctx->fds_alloc > 0) {
- - fd = &ctx->fds[poll_idx++];
- - memset(fd, 0, sizeof(*fd));
- - fd->fd = ctx->pipe[0];
- - fd->events = POLLRDNORM;
- + DBG(PEVENT, " => adding notify pipe fd=%i", ctx->pipe[0]);
- + EV_SET(&ctx->klist[poll_idx++], ctx->pipe[0], EVFILT_READ,
- + EV_ADD | EV_ONESHOT, 0, 0, 0);
- }
- - /* Fill in rest of poll() array */
- + /* XXX Fill in rest of kqueue array */
- timeout = INFTIM;
- TAILQ_FOREACH(ev, &ctx->events, next) {
- switch (ev->type) {
- @@ -609,30 +630,26 @@ loop:
- break;
- }
- ev->poll_idx = poll_idx++;
- - fd = &ctx->fds[ev->poll_idx];
- - memset(fd, 0, sizeof(*fd));
- - fd->fd = ev->u.fd;
- - fd->events = (ev->type == PEVENT_READ) ?
- - POLLRDNORM : POLLWRNORM;
- + DBG(PEVENT, " => adding fd=%i to ev=%p, index=%i",
- + ev->u.fd, ev, ev->poll_idx);
- + EV_SET(&ctx->klist[ev->poll_idx], ev->u.fd, (ev->type ==
- + PEVENT_READ) ? EVFILT_READ : EVFILT_WRITE, EV_ADD |
- + EV_ONESHOT, 0, 0, 0);
- break;
- }
- case PEVENT_TIME:
- {
- struct timeval remain;
- - int millis;
- /* Compute milliseconds until event */
- - if (timercmp(&ev->when, &now, <=))
- - millis = 0;
- - else {
- + if (timercmp(&ev->when, &now, <=)) {
- + pktimeout = NULL;
- + } else {
- timersub(&ev->when, &now, &remain);
- - millis = remain.tv_sec * 1000;
- - millis += remain.tv_usec / 1000;
- + TIMEVAL_TO_TIMESPEC(&remain, &ktimeout);
- + pktimeout = &ktimeout;
- }
- - /* Remember the minimum delay */
- - if (timeout == INFTIM || millis < timeout)
- - timeout = millis;
- break;
- }
- default:
- @@ -652,7 +669,7 @@ loop:
- break;
- }
- if ((ev->flags & PEVENT_OCCURRED) != 0)
- - timeout = 0; /* don't delay */
- + pktimeout = NULL;
- }
- #if PDEL_DEBUG
- @@ -665,41 +682,59 @@ loop:
- /* Wait for something to happen */
- MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
- DBG(PEVENT, "ctx %p thread sleeping", ctx);
- - r = poll(ctx->fds, poll_idx, timeout);
- +
- + if (ktimeout.tv_sec == 0 && ktimeout.tv_nsec == 0)
- + pktimeout = NULL;
- +
- + DBG(PEVENT, "Total %i events will be submitted to kqueue()", poll_idx);
- + nev = kevent(ctx->kq, ctx->klist, poll_idx, kevents, poll_idx, pktimeout);
- DBG(PEVENT, "ctx %p thread woke up", ctx);
- assert(ctx->magic == PEVENT_CTX_MAGIC);
- MUTEX_LOCK(&ctx->mutex, ctx->mutex_count);
- - /* Check for errors */
- - if (r == -1 && errno != EINTR) {
- - alogf(LOG_CRIT, "%s: %m", "poll");
- + /* XXX Check for errors */
- + if (nev < 0) {
- + alogf(LOG_CRIT, "%s: %m", "kqueue");
- assert(0);
- }
- /* Update current time */
- gettimeofday(&now, NULL);
- - /* Mark poll() events that have occurred */
- + DBG(PEVENT, " => itterating over %i kqueue() events", nev);
- + for (int i = 0; i < nev; i++) {
- + for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
- + DBG(PEVENT, " => comparing %lu with %i",
- + kevents[i].ident, ev->u.fd);
- + next_ev = TAILQ_NEXT(ev, next);
- + assert(ev->magic == PEVENT_MAGIC);
- + if (kevents[i].ident == ev->u.fd) {
- + DBG(PEVENT, " => ctx %p ev %p found in kqueue()", ctx, ev);
- + switch (ev->type) {
- + case PEVENT_READ:
- + case PEVENT_WRITE:
- + if (ev->poll_idx == -1)
- + break;
- + if ((ev->type == PEVENT_READ || ev->type ==
- + PEVENT_WRITE) && kevents[i].data > 0)
- + PEVENT_SET_OCCURRED(ctx, ev);
- + break;
- + case PEVENT_TIME:
- + if (timercmp(&ev->when, &now, <=))
- + PEVENT_SET_OCCURRED(ctx, ev);
- + break;
- + default:
- + break;
- + }
- + }
- + }
- + }
- +
- for (ev = TAILQ_FIRST((&ctx->events)); ev != NULL; ev = next_ev) {
- next_ev = TAILQ_NEXT(ev, next);
- assert(ev->magic == PEVENT_MAGIC);
- - switch (ev->type) {
- - case PEVENT_READ:
- - case PEVENT_WRITE:
- - if (ev->poll_idx == -1)
- - break;
- - fd = &ctx->fds[ev->poll_idx];
- - if ((fd->revents & ((ev->type == PEVENT_READ) ?
- - READABLE_EVENTS : WRITABLE_EVENTS)) != 0)
- - PEVENT_SET_OCCURRED(ctx, ev);
- - break;
- - case PEVENT_TIME:
- - if (timercmp(&ev->when, &now, <=))
- - PEVENT_SET_OCCURRED(ctx, ev);
- - break;
- - default:
- - break;
- - }
- + if (ev->type == PEVENT_TIME && timercmp(&ev->when, &now, <=))
- + PEVENT_SET_OCCURRED(ctx, ev);
- }
- /* Service all events that are marked as having occurred */
- @@ -988,6 +1023,7 @@ pevent_ctx_unref(struct pevent_ctx *ctx)
- assert(ctx->thread == 0);
- (void)close(ctx->pipe[0]);
- (void)close(ctx->pipe[1]);
- + (void)close(ctx->kq);
- MUTEX_UNLOCK(&ctx->mutex, ctx->mutex_count);
- pthread_mutex_destroy(&ctx->mutex);
- if (ctx->has_attr)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement