Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <semaphore.h>
- #include <pthread.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <assert.h>
- #include "list.h"
- #include "threadpool.h"
- #define NOT_EXECUTING 0
- #define EXECUTING 1
- #define FINISHED 2
- #define MAX(x, y) (((x) > (y)) ? (x) : (y))
- /* function declarations */
- static struct worker * worker_new(struct thread_pool *);
- static void spin_worker(void *arg);
- static struct worker * worker_for_id(struct thread_pool * pool, int id);
- /* global variables*/
- static __thread struct worker* thread_worker;
- /* structs */
- struct thread_pool
- {
- struct list /*<worker>*/ workers;
- struct list /*<future>*/ global_queue;
- pthread_mutex_t global_queue_lock;
- int num_threads;
- bool shutting_down;
- sem_t work_available;
- };
- struct future
- {
- fork_join_task_t task;
- void* data;
- int state; // 0 - Not Executed, 1 - Executing, 2 Finished
- void* result;
- int creator_id;
- int worker_id;
- struct list_elem elem;
- struct thread_pool * pool;
- int depth;
- sem_t sem;
- pthread_mutex_t future_lock;
- };
- struct worker
- {
- struct list /*<future>*/ queue;
- pthread_mutex_t queue_lock;
- struct thread_pool *pool;
- int worker_id;
- pthread_t thread;
- bool is_working;
- struct list_elem elem;
- int current_depth;
- struct future *current_task;
- };
- /* implementation */
- struct thread_pool *
- thread_pool_new(int nthreads)
- {
- struct thread_pool * pool = (struct thread_pool *) malloc(sizeof(struct thread_pool));
- thread_worker = NULL;
- pool->num_threads = nthreads;
- list_init(&pool->global_queue);
- list_init(&pool->workers);
- pthread_mutex_init(&pool->global_queue_lock, NULL);
- sem_init(&pool->work_available, 0, 0);
- pool->shutting_down = false;
- struct worker * worker;
- int i;
- for(i = 0; i < nthreads; i++)
- {
- worker = worker_new(pool);
- list_push_back(&pool->workers, &worker->elem);
- }
- return pool;
- }
- /*
- * Shutdown this thread pool in an orderly fashion.
- * Tasks that have been submitted but not executed may or
- * may not be executed.
- *
- * Deallocate the thread pool object before returning.
- */
- void
- thread_pool_shutdown_and_destroy(struct thread_pool *pool)
- {
- }
- struct future *
- thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void * data)
- {
- struct future * future = (struct future *) malloc(sizeof(struct future));
- future->task = task;
- future->data = data;
- future->state = NOT_EXECUTING;
- future->result = NULL;
- future->worker_id = -1;
- future->creator_id = -1;
- future->pool = pool;
- sem_init(&future->sem, 0, 0);
- pthread_mutex_init(&future->future_lock, NULL);
- if(thread_worker == NULL)
- {
- pthread_mutex_lock(&pool->global_queue_lock);
- list_push_back(&pool->global_queue, &future->elem);
- pthread_mutex_unlock(&pool->global_queue_lock);
- }
- else
- {
- pthread_mutex_lock(&thread_worker->queue_lock);
- future->creator_id = thread_worker->worker_id;
- list_push_back(&thread_worker->queue, &future->elem);
- pthread_mutex_unlock(&thread_worker->queue_lock);
- }
- sem_post(&pool->work_available);
- return future;
- }
- /* simple get, no leap frogging */
- void *
- future_get(struct future *future)
- {
- if(future->state == NOT_EXECUTING)
- { /* execute it */
- // which queue is future in
- // pthread_mutex_lock(&future->future_lock);
- if(future->creator_id == -1)
- {
- pthread_mutex_lock(&future->pool->global_queue_lock);
- list_remove(&future->elem);
- pthread_mutex_unlock(&future->pool->global_queue_lock);
- }
- else
- {
- struct worker * worker = worker_for_id(future->pool, future->creator_id);
- pthread_mutex_lock(&worker->queue_lock);
- list_remove(&future->elem);
- pthread_mutex_unlock(&worker->queue_lock);
- }
- // pthread_mutex_unlock(&future->future_lock);
- // is current thread main or worker
- if(thread_worker == NULL)
- {
- // pthread_mutex_lock(&future->future_lock);
- future->state = EXECUTING;
- future->worker_id = -2;
- future->result = future->task(future->pool, future->data);
- future->state = FINISHED;
- sem_post(&future->sem);
- // pthread_mutex_unlock(&future->future_lock);
- return future->result;
- }
- else
- {
- // pthread_mutex_lock(&future->future_lock);
- future->state = EXECUTING;
- future->worker_id = thread_worker->worker_id;
- thread_worker->is_working = true;
- future->result = future->task(future->pool, future->data);
- future->state = FINISHED;
- thread_worker->is_working = false;
- sem_post(&future->sem);
- // pthread_mutex_unlock(&future->future_lock);
- return future->result;
- }
- }
- if(future->state == EXECUTING)
- {
- /*
- pthread_mutex_lock(&future->pool->global_queue_lock);
- if(!list_empty(&future->pool->global_queue) && thread_worker != NULL)
- {
- struct list_elem *e = list_pop_front(&future->pool->global_queue);
- pthread_mutex_unlock(&future->pool->global_queue_lock);
- struct future * curr_future = list_entry(e, struct future, elem);
- pthread_mutex_lock(&curr_future->future_lock);
- curr_future->state = EXECUTING;
- curr_future->worker_id = thread_worker->worker_id;
- thread_worker->is_working = true;
- curr_future->result = curr_future->task(curr_future->pool, curr_future->data);
- curr_future->state = FINISHED;
- thread_worker->is_working = false;
- sem_post(&curr_future->sem);
- pthread_mutex_unlock(&curr_future->future_lock);
- }
- pthread_mutex_unlock(&future->pool->global_queue_lock);
- */
- sem_wait(&future->sem);
- return future->result;
- }
- if(future->state == FINISHED)
- {
- return future->result;
- }
- return future->result;
- }
- /*
- void *
- future_get(struct future *future)
- {
- //future_get is being called by the main thread
- if (thread_worker == NULL)
- {
- if (future->state == NOT_EXECUTING)
- {
- //pthread_mutex_lock(&future->pool->global_queue_lock);
- //dequeue and execute
- list_remove(&future->elem);
- //future->worker_id = thread_worker->worker_id;
- //future->depth = MAX(future->depth, thread_worker->current_depth + 1);
- future->state = EXECUTING;
- future->result = future->task(future->pool, future->data);
- //pthread_mutex_unlock(&future->pool->global_queue_lock);
- }
- return future->result;
- }
- //if worker that creates the future is the same worker that calls this function
- if (future->state == NOT_EXECUTING) {
- struct worker * creator = worker_for_id(future->pool, future->creator_id);
- pthread_mutex_lock(&creator->queue_lock);
- if (future->state == NOT_EXECUTING) {
- //dequeue and execute
- list_remove(&future->elem);
- future->worker_id = thread_worker->worker_id;
- future->depth = MAX(future->depth, thread_worker->current_depth + 1);
- future->state = EXECUTING;
- future->result = future->task(thread_worker->pool, future->data);
- pthread_mutex_unlock(&creator->queue_lock);
- }
- }
- else if (future->state == EXECUTING) {
- while (future->state == EXECUTING) {
- struct worker *executor = worker_for_id(future->pool, future->worker_id);
- //iterate through the worker's task queue until we find a task we can leapfrog to
- struct list_elem *e;
- for (e = list_begin (&executor->queue); e != list_end (&executor->queue); e = list_next (e))
- {
- struct future * task_to_leapfrog = list_entry (e, struct future, elem);
- if (task_to_leapfrog->depth > MAX(thread_worker->current_task->depth, future->depth))
- {
- list_remove(&task_to_leapfrog->elem);
- task_to_leapfrog->state = EXECUTING;
- future->worker_id = thread_worker->worker_id;
- future->result = future->task(thread_worker->pool, future->data);
- break;
- }
- }
- }
- }
- return future->result;
- }
- */
- void
- future_free(struct future *future)
- {
- free(future);
- }
- /////// Worker Functions ///////
- static struct worker *
- worker_new(struct thread_pool *pool)
- {
- static int id = 0;
- struct worker * worker = (struct worker *) malloc(sizeof(struct worker));
- list_init(&worker->queue);
- pthread_mutex_init(&worker->queue_lock, NULL);
- worker->pool = pool;
- worker->worker_id = id++;
- worker->is_working = false;
- pthread_create(&worker->thread, NULL, (void *) &spin_worker, (void *)worker);
- return worker;
- }
- static void
- spin_worker(void *arg)
- {
- struct worker *worker = (struct worker *)arg;
- for (;;)
- {
- sem_wait(&worker->pool->work_available);
- struct list_elem *e;
- e = NULL;
- // find a future to work on
- pthread_mutex_lock(&worker->queue_lock);
- if(!list_empty(&worker->queue))
- {
- e = list_pop_back(&worker->queue);
- }
- pthread_mutex_unlock(&worker->queue_lock);
- pthread_mutex_lock(&worker->pool->global_queue_lock);
- if(e == NULL && !list_empty(&worker->pool->global_queue))
- {
- e = list_pop_front(&worker->pool->global_queue);
- }
- pthread_mutex_unlock(&worker->pool->global_queue_lock);
- if(e == NULL)
- {
- struct list_elem *e2;
- e2 = list_begin(&worker->pool->workers);
- for (; e2 != list_end(&worker->pool->workers); e2 = list_next(e2))
- {
- struct worker * curr_worker = list_entry(e2, struct worker, elem);
- pthread_mutex_lock(&curr_worker->queue_lock);
- if(!list_empty(&curr_worker->queue))
- {
- e = list_pop_front(&curr_worker->queue);
- pthread_mutex_unlock(&curr_worker->queue_lock);
- break;
- }
- pthread_mutex_unlock(&curr_worker->queue_lock);
- }
- }
- // evaluate the future
- if(e != NULL)
- {
- struct future * future = list_entry(e, struct future, elem);
- // pthread_mutex_lock(&future->future_lock);
- future->state = EXECUTING;
- future->worker_id = worker->worker_id;
- worker->is_working = true;
- future->result = future->task(worker->pool, future->data);
- future->state = FINISHED;
- worker->is_working = false;
- sem_post(&future->sem);
- // pthread_mutex_unlock(&future->future_lock);
- }
- }
- }
- static struct worker*
- worker_for_id(struct thread_pool * pool, int id)
- {
- struct list_elem *e;
- for (e = list_begin (&pool->workers); e != list_end (&pool->workers); e = list_next (e))
- {
- struct worker * worker = list_entry (e, struct worker, elem);
- if (worker->worker_id == id) {
- return worker;
- }
- }
- return NULL;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement