Advertisement
Guest User

Untitled

a guest
Jan 22nd, 2021
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.53 KB | None | 0 0
  1. #include <iostream>
  2. #include <string.h>
  3. #include <pthread.h>
  4. #include <sstream>
  5. #include <vector>
  6. #include <time.h>
  7. #include <unistd.h>
  8. #define UNUSED(expr) do { (void)(expr); } while (0)
  9. #define NEEDTOWRITE 1
  10. #define NEEDTOREAD 2
  11. #define END 1
  12. #define NOTEND 2
  13. #define INIT 1
  14. #define NOTINIT 2
  15. using namespace std;
  16.  
  17. int isDebug = 0;
  18. int stat = NEEDTOWRITE;
  19. int isEnd = NOTEND;
  20. pthread_mutex_t numbMutex;
  21. pthread_mutex_t consumerMutex;
  22. pthread_cond_t needToReadCond, needToWriteCond;
  23. long long int numb;
  24. void* producer_routine(void* arg) {
  25.     UNUSED(arg);
  26.   // Wait for consumer to start
  27.    // vector<string>& result = *reinterpret_cast<vector<string>*>(arg);
  28.     string line;
  29.     getline(cin, line);
  30.     istringstream iss(line);
  31.     vector<string> result;
  32.     for(string s;iss>>s;)
  33.         result.push_back(s);
  34.     int n=result.size();
  35.     for(int i=0;i<n;i++){
  36.         pthread_mutex_lock(&numbMutex);
  37.         while (stat!=NEEDTOWRITE){
  38.             pthread_cond_wait(&needToWriteCond,&numbMutex);
  39.         }
  40.         numb = stoll(result[i]);
  41.         stat = NEEDTOREAD;
  42.         pthread_cond_signal(&needToReadCond);
  43.         pthread_mutex_unlock(&numbMutex);
  44.     }
  45.     isEnd = END;
  46.   // Read data, loop through each value and update the value, notify consumer, wait for consumer to process
  47.   return nullptr;
  48. }
  49.  
  50. void* consumer_routine(void* arg) {
  51.   // notify about start
  52.   // for every update issued by producer, read the value and add to sum
  53.   // return pointer to result (for particular consumer)
  54.     long long int local_result = 0;
  55.     int sleep = *reinterpret_cast<int*>(arg);
  56.  
  57.     UNUSED(arg);
  58.     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
  59.     while(isEnd != END) {
  60.  
  61.         pthread_mutex_lock(&consumerMutex);
  62.         if(isEnd == END){
  63.             if(stat == NEEDTOREAD) {
  64.                 local_result += numb;
  65.                 stat = NEEDTOWRITE;
  66.             }
  67.             pthread_mutex_unlock(&consumerMutex);
  68.             return (void *) local_result;
  69.  
  70.         }
  71.         while (stat != NEEDTOREAD) {
  72.             pthread_cond_wait(&needToReadCond, &consumerMutex);
  73.         }
  74.         local_result += numb;
  75.         stat = NEEDTOWRITE;
  76.         pthread_cond_signal(&needToWriteCond);
  77.         pthread_mutex_unlock(&consumerMutex);
  78.         if (sleep > 0) {
  79.             int sleep_millis = (rand() % sleep) + 1;
  80.             UNUSED(sleep_millis);
  81.             usleep(sleep_millis * 1000);
  82.         }
  83.     }
  84.     pthread_mutex_lock(&consumerMutex);
  85.     if(stat == NEEDTOREAD) {
  86.         local_result += numb;
  87.         stat = NEEDTOWRITE;
  88.     }
  89.     pthread_mutex_unlock(&consumerMutex);
  90.   return (void *) local_result;
  91. }
  92.  
  93. void* consumer_interruptor_routine(void* arg) {
  94.   // wait for consumers to start
  95.  
  96.   // interrupt random consumer while producer is running
  97.     UNUSED(arg);
  98.  
  99.   return nullptr;
  100. }
  101.  
  102. int run_threads(long long int threadsCount, int waitCount) {
  103.   // start N threads and wait until they're done
  104.   // return aggregated sum of values
  105.   long long int result  = 0 ;
  106.     if(threadsCount < 1) return 0;
  107.   if(threadsCount == 0){
  108.       return 0;
  109.   }
  110.     pthread_t producer, interrupter;
  111.   UNUSED(interrupter);
  112.   UNUSED(waitCount);
  113.     vector <pthread_t> consumers;
  114.     //int n=result.size();
  115.     pthread_create( &producer, NULL, producer_routine, NULL);
  116.     for (int i = 0; i < threadsCount; i++) {
  117.         pthread_t consumer;
  118.         pthread_create(&consumer, NULL, consumer_routine, &waitCount);
  119.         consumers.push_back(consumer);
  120.     }
  121.     for (int i = 0; i < threadsCount; i++) {
  122.         long long int partial = 0;
  123.         pthread_join(consumers[i], (void **) &partial);
  124.         result += partial;
  125.     }
  126.     pthread_join( producer, NULL);
  127.   return result;
  128. }
  129.  
  130. int get_tid() {
  131.   // 1 to 3+N thread ID
  132.  
  133.  
  134.   return 0;
  135. }
  136. int init(){
  137.     pthread_mutex_init(&numbMutex, NULL);
  138.     pthread_mutex_init(&consumerMutex, NULL);
  139.     pthread_cond_init(&needToReadCond, NULL);
  140.     pthread_cond_init(&needToWriteCond, NULL);
  141.     return 0;
  142. }
  143. int end(){
  144.     pthread_mutex_destroy(&numbMutex);
  145.     pthread_mutex_destroy(&consumerMutex);
  146.     pthread_cond_destroy(&needToReadCond);
  147.     pthread_cond_destroy(&needToWriteCond);
  148.     return 0;
  149. }
  150.  
  151. int main(int argc, char *argv[]) {
  152.     srand(time(NULL));
  153.     init();
  154.     long long int threadsCount = atoll(argv[1]);
  155.     int waitCount  = atoi(argv[2]);
  156.     if(argc > 3){
  157.         if(strcmp(argv[3],"–debug"))
  158.             isDebug = 1;
  159.     }
  160.     std::cout << run_threads(threadsCount, waitCount) << std::endl;
  161.     end();
  162.     return 0;
  163. }
  164.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement