Advertisement
linux

thpool.c

Jul 16th, 2018
159
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 13.54 KB | None | 0 0
  1. /* ********************************
  2.  * Author:       Johan Hanssen Seferidis
  3.  * License:      MIT
  4.  * Description:  Library providing a threading pool where you can add
  5.  *               work. For usage, check the thpool.h file or README.md
  6.  *
  7.  *//** @file thpool.h *//*
  8.  *
  9.  ********************************/
  10.  
  11. #define _POSIX_C_SOURCE 200809L
  12. #include <unistd.h>
  13. #include <signal.h>
  14. #include <stdio.h>
  15. #include <stdlib.h>
  16. #include <pthread.h>
  17. #include <errno.h>
  18. #include <time.h>
  19. #if defined(__linux__)
  20. #include <sys/prctl.h>
  21. #endif
  22.  
  23. #include "thpool.h"
  24.  
  25. #ifdef THPOOL_DEBUG
  26. #define THPOOL_DEBUG 1
  27. #else
  28. #define THPOOL_DEBUG 0
  29. #endif
  30.  
  31. #if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
  32. #define err(str) fprintf(stderr, str)
  33. #else
  34. #define err(str)
  35. #endif
  36.  
  37. static volatile int threads_keepalive;
  38. static volatile int threads_on_hold;
  39.  
  40.  
  41.  
  42. /* ========================== STRUCTURES ============================ */
  43.  
  44.  
  45. /* Binary semaphore */
  46. typedef struct bsem {
  47.     pthread_mutex_t mutex;
  48.     pthread_cond_t   cond;
  49.     int v;
  50. } bsem;
  51.  
  52.  
  53. /* Job */
  54. typedef struct job{
  55.     struct job*  prev;                   /* pointer to previous job   */
  56.     void   (*function)(void* arg);       /* function pointer          */
  57.     void*  arg;                          /* function's argument       */
  58. } job;
  59.  
  60.  
  61. /* Job queue */
  62. typedef struct jobqueue{
  63.     pthread_mutex_t rwmutex;             /* used for queue r/w access */
  64.     job  *front;                         /* pointer to front of queue */
  65.     job  *rear;                          /* pointer to rear  of queue */
  66.     bsem *has_jobs;                      /* flag as binary semaphore  */
  67.     int   len;                           /* number of jobs in queue   */
  68. } jobqueue;
  69.  
  70.  
  71. /* Thread */
  72. typedef struct thread{
  73.     int       id;                        /* friendly id               */
  74.     pthread_t pthread;                   /* pointer to actual thread  */
  75.     struct thpool_* thpool_p;            /* access to thpool          */
  76. } thread;
  77.  
  78.  
  79. /* Threadpool */
  80. typedef struct thpool_{
  81.     thread**   threads;                  /* pointer to threads        */
  82.     volatile int num_threads_alive;      /* threads currently alive   */
  83.     volatile int num_threads_working;    /* threads currently working */
  84.     pthread_mutex_t  thcount_lock;       /* used for thread count etc */
  85.     pthread_cond_t  threads_all_idle;    /* signal to thpool_wait     */
  86.     jobqueue  jobqueue;                  /* job queue                 */
  87. } thpool_;
  88.  
  89.  
  90.  
  91.  
  92.  
  93. /* ========================== PROTOTYPES ============================ */
  94.  
  95.  
  96. static int  thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
  97. static void* thread_do(struct thread* thread_p);
  98. static void  thread_hold(int sig_id);
  99. static void  thread_destroy(struct thread* thread_p);
  100.  
  101. static int   jobqueue_init(jobqueue* jobqueue_p);
  102. static void  jobqueue_clear(jobqueue* jobqueue_p);
  103. static void  jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
  104. static struct job* jobqueue_pull(jobqueue* jobqueue_p);
  105. static void  jobqueue_destroy(jobqueue* jobqueue_p);
  106.  
  107. static void  bsem_init(struct bsem *bsem_p, int value);
  108. static void  bsem_reset(struct bsem *bsem_p);
  109. static void  bsem_post(struct bsem *bsem_p);
  110. static void  bsem_post_all(struct bsem *bsem_p);
  111. static void  bsem_wait(struct bsem *bsem_p);
  112.  
  113.  
  114.  
  115.  
  116.  
  117. /* ========================== THREADPOOL ============================ */
  118.  
  119.  
  120. /* Initialise thread pool */
  121. struct thpool_* thpool_init(int num_threads){
  122.  
  123.     threads_on_hold   = 0;
  124.     threads_keepalive = 1;
  125.  
  126.     if (num_threads < 0){
  127.         num_threads = 0;
  128.     }
  129.  
  130.     /* Make new thread pool */
  131.     thpool_* thpool_p;
  132.     thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
  133.     if (thpool_p == NULL){
  134.         err("thpool_init(): Could not allocate memory for thread pool\n");
  135.         return NULL;
  136.     }
  137.     thpool_p->num_threads_alive   = 0;
  138.     thpool_p->num_threads_working = 0;
  139.  
  140.     /* Initialise the job queue */
  141.     if (jobqueue_init(&thpool_p->jobqueue) == -1){
  142.         err("thpool_init(): Could not allocate memory for job queue\n");
  143.         free(thpool_p);
  144.         return NULL;
  145.     }
  146.  
  147.     /* Make threads in pool */
  148.     thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
  149.     if (thpool_p->threads == NULL){
  150.         err("thpool_init(): Could not allocate memory for threads\n");
  151.         jobqueue_destroy(&thpool_p->jobqueue);
  152.         free(thpool_p);
  153.         return NULL;
  154.     }
  155.  
  156.     pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
  157.     pthread_cond_init(&thpool_p->threads_all_idle, NULL);
  158.  
  159.     /* Thread init */
  160.     int n;
  161.     for (n=0; n<num_threads; n++){
  162.         thread_init(thpool_p, &thpool_p->threads[n], n);
  163. #if THPOOL_DEBUG
  164.             printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
  165. #endif
  166.     }
  167.  
  168.     /* Wait for threads to initialize */
  169.     while (thpool_p->num_threads_alive != num_threads) {}
  170.  
  171.     return thpool_p;
  172. }
  173.  
  174.  
  175. /* Add work to the thread pool */
  176. int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
  177.     job* newjob;
  178.  
  179.     newjob=(struct job*)malloc(sizeof(struct job));
  180.     if (newjob==NULL){
  181.         err("thpool_add_work(): Could not allocate memory for new job\n");
  182.         return -1;
  183.     }
  184.  
  185.     /* add function and argument */
  186.     newjob->function=function_p;
  187.     newjob->arg=arg_p;
  188.  
  189.     /* add job to queue */
  190.     jobqueue_push(&thpool_p->jobqueue, newjob);
  191.  
  192.     return 0;
  193. }
  194.  
  195.  
  196. /* Wait until all jobs have finished */
  197. void thpool_wait(thpool_* thpool_p){
  198.     pthread_mutex_lock(&thpool_p->thcount_lock);
  199.     while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
  200.         pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
  201.     }
  202.     pthread_mutex_unlock(&thpool_p->thcount_lock);
  203. }
  204.  
  205.  
  206. /* Destroy the threadpool */
  207. void thpool_destroy(thpool_* thpool_p){
  208.     /* No need to destory if it's NULL */
  209.     if (thpool_p == NULL) return ;
  210.  
  211.     volatile int threads_total = thpool_p->num_threads_alive;
  212.  
  213.     /* End each thread 's infinite loop */
  214.     threads_keepalive = 0;
  215.  
  216.     /* Give one second to kill idle threads */
  217.     double TIMEOUT = 1.0;
  218.     time_t start, end;
  219.     double tpassed = 0.0;
  220.     time (&start);
  221.     while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
  222.         bsem_post_all(thpool_p->jobqueue.has_jobs);
  223.         time (&end);
  224.         tpassed = difftime(end,start);
  225.     }
  226.  
  227.     /* Poll remaining threads */
  228.     while (thpool_p->num_threads_alive){
  229.         bsem_post_all(thpool_p->jobqueue.has_jobs);
  230.         sleep(1);
  231.     }
  232.  
  233.     /* Job queue cleanup */
  234.     jobqueue_destroy(&thpool_p->jobqueue);
  235.     /* Deallocs */
  236.     int n;
  237.     for (n=0; n < threads_total; n++){
  238.         thread_destroy(thpool_p->threads[n]);
  239.     }
  240.     free(thpool_p->threads);
  241.     free(thpool_p);
  242. }
  243.  
  244.  
  245. /* Pause all threads in threadpool */
  246. void thpool_pause(thpool_* thpool_p) {
  247.     int n;
  248.     for (n=0; n < thpool_p->num_threads_alive; n++){
  249.         pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
  250.     }
  251. }
  252.  
  253.  
  254. /* Resume all threads in threadpool */
  255. void thpool_resume(thpool_* thpool_p) {
  256.     // resuming a single threadpool hasn't been
  257.     // implemented yet, meanwhile this supresses
  258.     // the warnings
  259.     (void)thpool_p;
  260.  
  261.     threads_on_hold = 0;
  262. }
  263.  
  264.  
  265. int thpool_num_threads_working(thpool_* thpool_p){
  266.     return thpool_p->num_threads_working;
  267. }
  268.  
  269.  
  270.  
  271.  
  272.  
  273. /* ============================ THREAD ============================== */
  274.  
  275.  
  276. /* Initialize a thread in the thread pool
  277.  *
  278.  * @param thread        address to the pointer of the thread to be created
  279.  * @param id            id to be given to the thread
  280.  * @return 0 on success, -1 otherwise.
  281.  */
  282. static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
  283.  
  284.     *thread_p = (struct thread*)malloc(sizeof(struct thread));
  285.     if (thread_p == NULL){
  286.         err("thread_init(): Could not allocate memory for thread\n");
  287.         return -1;
  288.     }
  289.  
  290.     (*thread_p)->thpool_p = thpool_p;
  291.     (*thread_p)->id       = id;
  292.  
  293.     pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
  294.     pthread_detach((*thread_p)->pthread);
  295.     return 0;
  296. }
  297.  
  298.  
  299. /* Sets the calling thread on hold */
  300. static void thread_hold(int sig_id) {
  301.     (void)sig_id;
  302.     threads_on_hold = 1;
  303.     while (threads_on_hold){
  304.         sleep(1);
  305.     }
  306. }
  307.  
  308.  
  309. /* What each thread is doing
  310. *
  311. * In principle this is an endless loop. The only time this loop gets interuppted is once
  312. * thpool_destroy() is invoked or the program exits.
  313. *
  314. * @param  thread        thread that will run this function
  315. * @return nothing
  316. */
  317. static void* thread_do(struct thread* thread_p){
  318.  
  319.     /* Set thread name for profiling and debuging */
  320.     char thread_name[128] = {0};
  321.     sprintf(thread_name, "thread-pool-%d", thread_p->id);
  322.  
  323. #if defined(__linux__)
  324.     /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
  325.     prctl(PR_SET_NAME, thread_name);
  326. #elif defined(__APPLE__) && defined(__MACH__)
  327.     pthread_setname_np(thread_name);
  328. #else
  329.     err("thread_do(): pthread_setname_np is not supported on this system");
  330. #endif
  331.  
  332.     /* Assure all threads have been created before starting serving */
  333.     thpool_* thpool_p = thread_p->thpool_p;
  334.  
  335.     /* Register signal handler */
  336.     struct sigaction act;
  337.     sigemptyset(&act.sa_mask);
  338.     act.sa_flags = 0;
  339.     act.sa_handler = thread_hold;
  340.     if (sigaction(SIGUSR1, &act, NULL) == -1) {
  341.         err("thread_do(): cannot handle SIGUSR1");
  342.     }
  343.  
  344.     /* Mark thread as alive (initialized) */
  345.     pthread_mutex_lock(&thpool_p->thcount_lock);
  346.     thpool_p->num_threads_alive += 1;
  347.     pthread_mutex_unlock(&thpool_p->thcount_lock);
  348.  
  349.     while(threads_keepalive){
  350.  
  351.         bsem_wait(thpool_p->jobqueue.has_jobs);
  352.  
  353.         if (threads_keepalive){
  354.  
  355.             pthread_mutex_lock(&thpool_p->thcount_lock);
  356.             thpool_p->num_threads_working++;
  357.             pthread_mutex_unlock(&thpool_p->thcount_lock);
  358.  
  359.             /* Read job from queue and execute it */
  360.             void (*func_buff)(void*);
  361.             void*  arg_buff;
  362.             job* job_p = jobqueue_pull(&thpool_p->jobqueue);
  363.             if (job_p) {
  364.                 func_buff = job_p->function;
  365.                 arg_buff  = job_p->arg;
  366.                 func_buff(arg_buff);
  367.                 free(job_p);
  368.             }
  369.  
  370.             pthread_mutex_lock(&thpool_p->thcount_lock);
  371.             thpool_p->num_threads_working--;
  372.             if (!thpool_p->num_threads_working) {
  373.                 pthread_cond_signal(&thpool_p->threads_all_idle);
  374.             }
  375.             pthread_mutex_unlock(&thpool_p->thcount_lock);
  376.  
  377.         }
  378.     }
  379.     pthread_mutex_lock(&thpool_p->thcount_lock);
  380.     thpool_p->num_threads_alive --;
  381.     pthread_mutex_unlock(&thpool_p->thcount_lock);
  382.  
  383.     return NULL;
  384. }
  385.  
  386.  
  387. /* Frees a thread  */
  388. static void thread_destroy (thread* thread_p){
  389.     free(thread_p);
  390. }
  391.  
  392.  
  393.  
  394.  
  395.  
  396. /* ============================ JOB QUEUE =========================== */
  397.  
  398.  
  399. /* Initialize queue */
  400. static int jobqueue_init(jobqueue* jobqueue_p){
  401.     jobqueue_p->len = 0;
  402.     jobqueue_p->front = NULL;
  403.     jobqueue_p->rear  = NULL;
  404.  
  405.     jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
  406.     if (jobqueue_p->has_jobs == NULL){
  407.         return -1;
  408.     }
  409.  
  410.     pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
  411.     bsem_init(jobqueue_p->has_jobs, 0);
  412.  
  413.     return 0;
  414. }
  415.  
  416.  
  417. /* Clear the queue */
  418. static void jobqueue_clear(jobqueue* jobqueue_p){
  419.  
  420.     while(jobqueue_p->len){
  421.         free(jobqueue_pull(jobqueue_p));
  422.     }
  423.  
  424.     jobqueue_p->front = NULL;
  425.     jobqueue_p->rear  = NULL;
  426.     bsem_reset(jobqueue_p->has_jobs);
  427.     jobqueue_p->len = 0;
  428.  
  429. }
  430.  
  431.  
  432. /* Add (allocated) job to queue
  433.  */
  434. static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
  435.  
  436.     pthread_mutex_lock(&jobqueue_p->rwmutex);
  437.     newjob->prev = NULL;
  438.  
  439.     switch(jobqueue_p->len){
  440.  
  441.         case 0:  /* if no jobs in queue */
  442.                     jobqueue_p->front = newjob;
  443.                     jobqueue_p->rear  = newjob;
  444.                     break;
  445.  
  446.         default: /* if jobs in queue */
  447.                     jobqueue_p->rear->prev = newjob;
  448.                     jobqueue_p->rear = newjob;
  449.  
  450.     }
  451.     jobqueue_p->len++;
  452.  
  453.     bsem_post(jobqueue_p->has_jobs);
  454.     pthread_mutex_unlock(&jobqueue_p->rwmutex);
  455. }
  456.  
  457.  
  458. /* Get first job from queue(removes it from queue)
  459. <<<<<<< HEAD
  460.  *
  461.  * Notice: Caller MUST hold a mutex
  462. =======
  463. >>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490
  464.  */
  465. static struct job* jobqueue_pull(jobqueue* jobqueue_p){
  466.  
  467.     pthread_mutex_lock(&jobqueue_p->rwmutex);
  468.     job* job_p = jobqueue_p->front;
  469.  
  470.     switch(jobqueue_p->len){
  471.  
  472.         case 0:  /* if no jobs in queue */
  473.                     break;
  474.  
  475.         case 1:  /* if one job in queue */
  476.                     jobqueue_p->front = NULL;
  477.                     jobqueue_p->rear  = NULL;
  478.                     jobqueue_p->len = 0;
  479.                     break;
  480.  
  481.         default: /* if >1 jobs in queue */
  482.                     jobqueue_p->front = job_p->prev;
  483.                     jobqueue_p->len--;
  484.                     /* more than one job in queue -> post it */
  485.                     bsem_post(jobqueue_p->has_jobs);
  486.  
  487.     }
  488.  
  489.     pthread_mutex_unlock(&jobqueue_p->rwmutex);
  490.     return job_p;
  491. }
  492.  
  493.  
  494. /* Free all queue resources back to the system */
  495. static void jobqueue_destroy(jobqueue* jobqueue_p){
  496.     jobqueue_clear(jobqueue_p);
  497.     free(jobqueue_p->has_jobs);
  498. }
  499.  
  500.  
  501.  
  502.  
  503.  
  504. /* ======================== SYNCHRONISATION ========================= */
  505.  
  506.  
  507. /* Init semaphore to 1 or 0 */
  508. static void bsem_init(bsem *bsem_p, int value) {
  509.     if (value < 0 || value > 1) {
  510.         err("bsem_init(): Binary semaphore can take only values 1 or 0");
  511.         exit(1);
  512.     }
  513.     pthread_mutex_init(&(bsem_p->mutex), NULL);
  514.     pthread_cond_init(&(bsem_p->cond), NULL);
  515.     bsem_p->v = value;
  516. }
  517.  
  518.  
  519. /* Reset semaphore to 0 */
  520. static void bsem_reset(bsem *bsem_p) {
  521.     bsem_init(bsem_p, 0);
  522. }
  523.  
  524.  
  525. /* Post to at least one thread */
  526. static void bsem_post(bsem *bsem_p) {
  527.     pthread_mutex_lock(&bsem_p->mutex);
  528.     bsem_p->v = 1;
  529.     pthread_cond_signal(&bsem_p->cond);
  530.     pthread_mutex_unlock(&bsem_p->mutex);
  531. }
  532.  
  533.  
  534. /* Post to all threads */
  535. static void bsem_post_all(bsem *bsem_p) {
  536.     pthread_mutex_lock(&bsem_p->mutex);
  537.     bsem_p->v = 1;
  538.     pthread_cond_broadcast(&bsem_p->cond);
  539.     pthread_mutex_unlock(&bsem_p->mutex);
  540. }
  541.  
  542.  
  543. /* Wait on semaphore until semaphore has value 0 */
  544. static void bsem_wait(bsem* bsem_p) {
  545.     pthread_mutex_lock(&bsem_p->mutex);
  546.     while (bsem_p->v != 1) {
  547.         pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
  548.     }
  549.     bsem_p->v = 0;
  550.     pthread_mutex_unlock(&bsem_p->mutex);
  551. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement