Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2014
265
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 11.00 KB | None | 0 0
  1. #include <semaphore.h>
  2. #include <pthread.h>
  3. #include <stdlib.h>
  4. #include <stdio.h>
  5. #include <assert.h>
  6. #include "list.h"
  7. #include "threadpool.h"
  8. #define NOT_EXECUTING    0
  9. #define EXECUTING        1
  10. #define FINISHED         2
  11. #define MAX(x, y) (((x) > (y)) ? (x) : (y))
  12.  
  13.  
  14. /* function declarations */
  15. static struct worker * worker_new(struct thread_pool *);
  16. static void spin_worker(void *arg);
  17. static struct worker * worker_for_id(struct thread_pool * pool, int id);
  18.  
  19. /* global variables*/
  20. static __thread struct worker* thread_worker;
  21.  
  22. /* structs */
  23. struct thread_pool
  24. {
  25.     struct list /*<worker>*/ workers;
  26.     struct list /*<future>*/ global_queue;
  27.     pthread_mutex_t global_queue_lock;
  28.     int num_threads;
  29.     bool shutting_down;
  30.  
  31.     sem_t work_available;
  32. };
  33.  
  34. struct future
  35. {
  36.     fork_join_task_t task;
  37.     void* data;
  38.     int state;  // 0 - Not Executed, 1 - Executing, 2 Finished
  39.     void* result;
  40.     int creator_id;
  41.     int worker_id;
  42.     struct list_elem elem;
  43.     struct thread_pool * pool;
  44.     int depth;
  45.     sem_t sem;
  46.     pthread_mutex_t future_lock;
  47. };
  48.  
  49. struct worker
  50. {
  51.     struct list /*<future>*/ queue;
  52.     pthread_mutex_t queue_lock;
  53.     struct thread_pool *pool;
  54.     int worker_id;
  55.     pthread_t thread;
  56.     bool is_working;
  57.     struct list_elem elem;
  58.     int current_depth;
  59.     struct future *current_task;
  60. };
  61.  
  62. /* implementation */
  63. struct thread_pool *
  64. thread_pool_new(int nthreads)
  65. {
  66.     struct thread_pool * pool = (struct thread_pool *) malloc(sizeof(struct thread_pool));
  67.    
  68.     thread_worker = NULL;
  69.     pool->num_threads = nthreads;
  70.     list_init(&pool->global_queue);
  71.     list_init(&pool->workers);
  72.     pthread_mutex_init(&pool->global_queue_lock, NULL);
  73.     sem_init(&pool->work_available, 0, 0);
  74.     pool->shutting_down = false;
  75.  
  76.     struct worker * worker;    
  77.     int i;
  78.     for(i = 0; i < nthreads; i++)
  79.     {
  80.         worker = worker_new(pool);
  81.         list_push_back(&pool->workers, &worker->elem);
  82.     }
  83.  
  84.     return pool;
  85. }
  86.  
  87. /*
  88.  * Shutdown this thread pool in an orderly fashion.  
  89.  * Tasks that have been submitted but not executed may or
  90.  * may not be executed.
  91.  *
  92.  * Deallocate the thread pool object before returning.
  93.  */
  94. void
  95. thread_pool_shutdown_and_destroy(struct thread_pool *pool)
  96. {
  97.  
  98. }
  99.  
  100. struct future *
  101. thread_pool_submit(struct thread_pool *pool, fork_join_task_t task, void * data)
  102. {
  103.     struct future * future = (struct future *) malloc(sizeof(struct future));
  104.     future->task = task;
  105.     future->data = data;
  106.     future->state = NOT_EXECUTING;
  107.     future->result = NULL;
  108.     future->worker_id = -1;
  109.     future->creator_id = -1;
  110.     future->pool = pool;
  111.     sem_init(&future->sem, 0, 0);
  112.     pthread_mutex_init(&future->future_lock, NULL);
  113.  
  114.     if(thread_worker == NULL)
  115.     {
  116.         pthread_mutex_lock(&pool->global_queue_lock);
  117.         list_push_back(&pool->global_queue, &future->elem);
  118.         pthread_mutex_unlock(&pool->global_queue_lock);
  119.     }
  120.     else
  121.     {
  122.         pthread_mutex_lock(&thread_worker->queue_lock);
  123.         future->creator_id = thread_worker->worker_id;     
  124.         list_push_back(&thread_worker->queue, &future->elem);
  125.         pthread_mutex_unlock(&thread_worker->queue_lock);
  126.     }
  127.  
  128.     sem_post(&pool->work_available);
  129.  
  130.     return future;
  131. }
  132.  
  133. /* simple get, no leap frogging */
  134. void *
  135. future_get(struct future *future)
  136. {
  137.     if(future->state == NOT_EXECUTING)
  138.     { /* execute it */
  139.  
  140.         // which queue is future in
  141.         // pthread_mutex_lock(&future->future_lock);
  142.         if(future->creator_id == -1)
  143.         {
  144.             pthread_mutex_lock(&future->pool->global_queue_lock);
  145.             list_remove(&future->elem);
  146.             pthread_mutex_unlock(&future->pool->global_queue_lock);
  147.         }
  148.         else
  149.         {
  150.             struct worker * worker = worker_for_id(future->pool, future->creator_id);
  151.             pthread_mutex_lock(&worker->queue_lock);
  152.             list_remove(&future->elem);
  153.             pthread_mutex_unlock(&worker->queue_lock);         
  154.         }
  155.         // pthread_mutex_unlock(&future->future_lock);
  156.        
  157.         // is current thread main or worker
  158.         if(thread_worker == NULL)
  159.         {
  160.             // pthread_mutex_lock(&future->future_lock);
  161.             future->state = EXECUTING;
  162.             future->worker_id = -2;
  163.             future->result = future->task(future->pool, future->data);     
  164.             future->state = FINISHED;
  165.             sem_post(&future->sem);    
  166.             // pthread_mutex_unlock(&future->future_lock);
  167.             return future->result;
  168.         }
  169.         else
  170.         {
  171.             // pthread_mutex_lock(&future->future_lock);
  172.             future->state = EXECUTING;
  173.             future->worker_id = thread_worker->worker_id;
  174.             thread_worker->is_working = true;
  175.             future->result = future->task(future->pool, future->data);
  176.             future->state = FINISHED;
  177.             thread_worker->is_working = false;     
  178.             sem_post(&future->sem);    
  179.             // pthread_mutex_unlock(&future->future_lock);
  180.             return future->result;         
  181.         }
  182.     }
  183.  
  184.     if(future->state == EXECUTING)
  185.     {
  186.         /*
  187.         pthread_mutex_lock(&future->pool->global_queue_lock);
  188.         if(!list_empty(&future->pool->global_queue) && thread_worker != NULL)
  189.         {
  190.             struct list_elem *e = list_pop_front(&future->pool->global_queue);
  191.             pthread_mutex_unlock(&future->pool->global_queue_lock);
  192.             struct future * curr_future = list_entry(e, struct future, elem);
  193.             pthread_mutex_lock(&curr_future->future_lock);
  194.             curr_future->state = EXECUTING;
  195.             curr_future->worker_id = thread_worker->worker_id;
  196.             thread_worker->is_working = true;
  197.             curr_future->result = curr_future->task(curr_future->pool, curr_future->data);
  198.             curr_future->state = FINISHED;
  199.             thread_worker->is_working = false;     
  200.             sem_post(&curr_future->sem);       
  201.             pthread_mutex_unlock(&curr_future->future_lock);
  202.         }
  203.         pthread_mutex_unlock(&future->pool->global_queue_lock);
  204.         */
  205.  
  206.         sem_wait(&future->sem);
  207.         return future->result;
  208.     }
  209.  
  210.     if(future->state == FINISHED)
  211.     {
  212.         return future->result;
  213.     }
  214.  
  215.     return future->result;
  216. }
  217.  
  218. /*
  219. void *
  220. future_get(struct future *future)
  221. {
  222.         //future_get is being called by the main thread
  223.         if (thread_worker == NULL)
  224.         {
  225.                 if (future->state == NOT_EXECUTING)
  226.                 {
  227.                         //pthread_mutex_lock(&future->pool->global_queue_lock);
  228.                         //dequeue and execute
  229.                         list_remove(&future->elem);
  230.                         //future->worker_id = thread_worker->worker_id;
  231.                         //future->depth = MAX(future->depth, thread_worker->current_depth + 1);
  232.                         future->state = EXECUTING;
  233.                         future->result = future->task(future->pool, future->data);
  234.                         //pthread_mutex_unlock(&future->pool->global_queue_lock);
  235.                 }
  236.                 return future->result;
  237.         }
  238.  
  239.         //if worker that creates the future is the same worker that calls this function
  240.         if (future->state == NOT_EXECUTING) {
  241.                 struct worker * creator = worker_for_id(future->pool, future->creator_id);
  242.                 pthread_mutex_lock(&creator->queue_lock);
  243.                 if (future->state == NOT_EXECUTING) {
  244.                         //dequeue and execute
  245.                         list_remove(&future->elem);
  246.                         future->worker_id = thread_worker->worker_id;
  247.                         future->depth = MAX(future->depth, thread_worker->current_depth + 1);
  248.                         future->state = EXECUTING;
  249.                         future->result = future->task(thread_worker->pool, future->data);
  250.                         pthread_mutex_unlock(&creator->queue_lock);
  251.                 }
  252.         }
  253.         else if (future->state == EXECUTING) {
  254.                         while (future->state == EXECUTING) {
  255.                                 struct worker *executor = worker_for_id(future->pool, future->worker_id);
  256.                                 //iterate through the worker's task queue until we find a task we can leapfrog to
  257.                                 struct list_elem *e;
  258.                                 for (e = list_begin (&executor->queue); e != list_end (&executor->queue); e = list_next (e))
  259.                     {
  260.                                 struct future * task_to_leapfrog = list_entry (e, struct future, elem);
  261.                                 if (task_to_leapfrog->depth > MAX(thread_worker->current_task->depth, future->depth))
  262.                                 {
  263.                                         list_remove(&task_to_leapfrog->elem);
  264.                                         task_to_leapfrog->state = EXECUTING;
  265.                                         future->worker_id = thread_worker->worker_id;
  266.                                         future->result = future->task(thread_worker->pool, future->data);
  267.                                         break;
  268.                                 }
  269.                         }
  270.                         }
  271.         }
  272.         return future->result;
  273. }
  274. */
  275.  
  276.  
  277. void
  278. future_free(struct future *future)
  279. {
  280.     free(future);
  281. }
  282.  
  283.  
  284. /////// Worker Functions ///////
  285.  
  286. static struct worker *
  287. worker_new(struct thread_pool *pool)
  288. {
  289.     static int id = 0;
  290.  
  291.     struct worker * worker = (struct worker *) malloc(sizeof(struct worker));
  292.     list_init(&worker->queue);
  293.     pthread_mutex_init(&worker->queue_lock, NULL);
  294.     worker->pool = pool;
  295.     worker->worker_id = id++;
  296.     worker->is_working = false;
  297.     pthread_create(&worker->thread, NULL, (void *) &spin_worker, (void *)worker);  
  298.  
  299.     return worker;
  300. }
  301.  
  302. static void
  303. spin_worker(void *arg)
  304. {
  305.     struct worker *worker = (struct worker *)arg;
  306.  
  307.     for (;;)
  308.     {
  309.         sem_wait(&worker->pool->work_available);
  310.         struct list_elem *e;
  311.         e = NULL;
  312.  
  313.         // find a future to work on
  314.         pthread_mutex_lock(&worker->queue_lock);
  315.         if(!list_empty(&worker->queue))
  316.         {
  317.             e = list_pop_back(&worker->queue);
  318.         }
  319.         pthread_mutex_unlock(&worker->queue_lock);
  320.         pthread_mutex_lock(&worker->pool->global_queue_lock);
  321.         if(e == NULL && !list_empty(&worker->pool->global_queue))
  322.         {
  323.             e = list_pop_front(&worker->pool->global_queue);
  324.         }
  325.         pthread_mutex_unlock(&worker->pool->global_queue_lock);
  326.         if(e == NULL)
  327.         {
  328.             struct list_elem *e2;
  329.             e2 = list_begin(&worker->pool->workers);
  330.             for (; e2 != list_end(&worker->pool->workers); e2 = list_next(e2))
  331.             {
  332.                 struct worker * curr_worker = list_entry(e2, struct worker, elem);
  333.                 pthread_mutex_lock(&curr_worker->queue_lock);
  334.                 if(!list_empty(&curr_worker->queue))
  335.                 {
  336.                     e = list_pop_front(&curr_worker->queue);
  337.                     pthread_mutex_unlock(&curr_worker->queue_lock);
  338.                     break;
  339.                 }
  340.                 pthread_mutex_unlock(&curr_worker->queue_lock);
  341.             }
  342.         }
  343.  
  344.         // evaluate the future
  345.         if(e != NULL)
  346.         {
  347.             struct future * future = list_entry(e, struct future, elem);
  348.             // pthread_mutex_lock(&future->future_lock);
  349.             future->state = EXECUTING;
  350.             future->worker_id = worker->worker_id;
  351.             worker->is_working = true;
  352.             future->result = future->task(worker->pool, future->data);
  353.             future->state = FINISHED;
  354.             worker->is_working = false;    
  355.             sem_post(&future->sem);    
  356.             // pthread_mutex_unlock(&future->future_lock);
  357.         }
  358.  
  359.     }
  360. }
  361.  
  362. static struct worker*
  363. worker_for_id(struct thread_pool * pool, int id)
  364. {
  365.     struct list_elem *e;
  366.  
  367.     for (e = list_begin (&pool->workers); e != list_end (&pool->workers); e = list_next (e))
  368.     {
  369.         struct worker * worker = list_entry (e, struct worker, elem);
  370.         if (worker->worker_id == id) {
  371.             return worker;
  372.         }
  373.     }
  374.     return NULL;
  375. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement