samkopite

Untitled

Sep 18th, 2017
49
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.17 KB | None | 0 0
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <pthread.h>
  4. #include <semaphore.h>
  5.  
  6.  
  7.  
  8. #define SIZE 10
  9. #define NUMB_THREADS 10
  10. #define PRODUCER_LOOPS 2
  11. #define CONSUMER_LOOPS 2
  12.  
  13. #define TRUE 1
  14. #define FALSE 0
  15.  
  16. typedef int buffer_t;
  17. buffer_t buffer[SIZE];
  18. int buffer_index;
  19.  
  20. pthread_mutex_t buffer_mutex;
  21. /* initially buffer will be empty. full_sem
  22. will be initialized to buffer SIZE, which means
  23. SIZE number of producer threads can write to it.
  24. And empty_sem will be initialized to 0, so no
  25. consumer can read from buffer until a producer
  26. thread posts to empty_sem */
  27. sem_t full_sem; /* when 0, buffer is full */
  28. sem_t empty_sem; /* when 0, buffer is empty. Kind of
  29. like an index for the buffer */
  30.  
  31. /* sem_post algorithm:
  32. mutex_lock sem_t->mutex
  33. sem_t->value++
  34. mutex_unlock sem_t->mutex
  35.  
  36. sem_wait algorithn:
  37. mutex_lock sem_t->mutex
  38. while (sem_t->value > 0) {
  39. mutex_unlock sem_t->mutex
  40. sleep... wake up
  41. mutex_lock sem_t->mutex
  42. }
  43. sem_t->value--
  44. mutex_unlock sem_t->mutex
  45. */
  46.  
  47.  
  48. void insertbuffer(buffer_t value) {
  49. if (buffer_index < SIZE) {
  50. buffer[buffer_index++] = value;
  51. } else {
  52. printf("Buffer overflow\n");
  53. }
  54. }
  55.  
  56. buffer_t dequeuebuffer() {
  57. if (buffer_index > 0) {
  58. return buffer[--buffer_index]; // buffer_index-- would be error!
  59. } else {
  60. printf("Buffer underflow\n");
  61. }
  62. return 0;
  63. }
  64.  
  65.  
  66. int isempty() {
  67. if (buffer_index == 0)
  68. return TRUE;
  69. return FALSE;
  70. }
  71.  
  72. int isfull() {
  73. if (buffer_index == SIZE)
  74. return TRUE;
  75. return FALSE;
  76. }
  77.  
  78. void *producer2(void *thread_n) {
  79. int thread_numb = *(int *)thread_n;
  80. buffer_t value;
  81. int i=0;
  82. while (i++ < PRODUCER_LOOPS) {
  83. sleep(rand() % 10);
  84. value = rand() % 100;
  85. pthread_mutex_lock(&buffer_mutex);
  86. do {
  87. // cond variables do the unlock/wait and wakeup/lock atomically,
  88. // which avoids possible race conditions
  89. pthread_mutex_unlock(&buffer_mutex);
  90. // cannot go to slepp holding lock
  91. sem_wait(&full_sem); // sem=0: wait. sem>0: go and decrement it
  92. // there could still be race condition here. another
  93. // thread could wake up and aqcuire lock and fill up
  94. // buffer. that's why we need to check for spurious wakeups
  95. pthread_mutex_lock(&buffer_mutex);
  96. } while (isfull()); // check for spurios wake-ups
  97. insertbuffer(value);
  98. pthread_mutex_unlock(&buffer_mutex);
  99. sem_post(&empty_sem); // post (increment) emptybuffer semaphore
  100. printf("Producer %d added %d to buffer\n", thread_numb, value);
  101. }
  102. pthread_exit(0);
  103. }
  104.  
  105. void *consumer2(void *thread_n) {
  106. int thread_numb = *(int *)thread_n;
  107. buffer_t value;
  108. int i=0;
  109. while (i++ < PRODUCER_LOOPS) {
  110. pthread_mutex_lock(&buffer_mutex);
  111. do {
  112. pthread_mutex_unlock(&buffer_mutex);
  113. sem_wait(&empty_sem);
  114. pthread_mutex_lock(&buffer_mutex);
  115. } while (isempty()); //check for spurios wakeups
  116. value = dequeuebuffer(value);
  117. pthread_mutex_unlock(&buffer_mutex);
  118. sem_post(&full_sem); // post (increment) fullbuffer semaphore
  119. printf("Consumer %d dequeue %d from buffer\n", thread_numb, value);
  120. }
  121. pthread_exit(0);
  122. }
  123.  
  124. int main(int argc, int **argv) {
  125. buffer_index = 0;
  126.  
  127. pthread_mutex_init(&buffer_mutex, NULL);
  128. sem_init(&full_sem, // sem_t *sem
  129. 0, // int pshared. 0 = shared between threads of process, 1 = shared between processes
  130. SIZE); // unsigned int value. Initial value
  131. sem_init(&empty_sem,
  132. 0,
  133. 0);
  134. /* full_sem is initialized to buffer size because SIZE number of
  135. producers can add one element to buffer each. They will wait
  136. semaphore each time, which will decrement semaphore value.
  137. empty_sem is initialized to 0, because buffer starts empty and
  138. consumer cannot take any element from it. They will have to wait
  139. until producer posts to that semaphore (increments semaphore
  140. value) */
  141. pthread_t thread[NUMB_THREADS];
  142. int thread_numb[NUMB_THREADS];
  143. int i;
  144. for (i = 0; i < NUMB_THREADS; ) {
  145. thread_numb[i] = i;
  146. pthread_create(thread + i, // pthread_t *t
  147. NULL, // const pthread_attr_t *attr
  148. producer2, // void *(*start_routine) (void *)
  149. thread_numb + i); // void *arg
  150. i++;
  151. thread_numb[i] = i;
  152. // playing a bit with thread and thread_numb pointers...
  153. pthread_create(&thread[i], // pthread_t *t
  154. NULL, // const pthread_attr_t *attr
  155. consumer2, // void *(*start_routine) (void *)
  156. &thread_numb[i]); // void *arg
  157. i++;
  158. }
  159.  
  160. for (i = 0; i < NUMB_THREADS; i++)
  161. pthread_join(thread[i], NULL);
  162.  
  163. pthread_mutex_destroy(&buffer_mutex);
  164. sem_destroy(&full_sem);
  165. sem_destroy(&empty_sem);
  166.  
  167. return 0;
  168. }
Add Comment
Please, Sign In to add comment