Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <string.h>
- #include <pthread.h>
- #include <sstream>
- #include <vector>
- #include <time.h>
- #include <unistd.h>
- #define UNUSED(expr) do { (void)(expr); } while (0)
- #define NEEDTOWRITE 1
- #define NEEDTOREAD 2
- #define END 1
- #define NOTEND 2
- #define INIT 1
- #define NOTINIT 2
- using namespace std;
- int isDebug = 0;
- int stat = NEEDTOWRITE;
- int isEnd = NOTEND;
- pthread_mutex_t numbMutex;
- pthread_mutex_t consumerMutex;
- pthread_cond_t needToReadCond, needToWriteCond;
- long long int numb;
- void* producer_routine(void* arg) {
- UNUSED(arg);
- // Wait for consumer to start
- // vector<string>& result = *reinterpret_cast<vector<string>*>(arg);
- string line;
- getline(cin, line);
- istringstream iss(line);
- vector<string> result;
- for(string s;iss>>s;)
- result.push_back(s);
- int n=result.size();
- for(int i=0;i<n;i++){
- pthread_mutex_lock(&numbMutex);
- while (stat!=NEEDTOWRITE){
- pthread_cond_wait(&needToWriteCond,&numbMutex);
- }
- numb = stoll(result[i]);
- stat = NEEDTOREAD;
- pthread_cond_signal(&needToReadCond);
- pthread_mutex_unlock(&numbMutex);
- }
- isEnd = END;
- // Read data, loop through each value and update the value, notify consumer, wait for consumer to process
- return nullptr;
- }
- void* consumer_routine(void* arg) {
- // notify about start
- // for every update issued by producer, read the value and add to sum
- // return pointer to result (for particular consumer)
- long long int local_result = 0;
- int sleep = *reinterpret_cast<int*>(arg);
- UNUSED(arg);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- while(isEnd != END) {
- pthread_mutex_lock(&consumerMutex);
- if(isEnd == END){
- if(stat == NEEDTOREAD) {
- local_result += numb;
- stat = NEEDTOWRITE;
- }
- pthread_mutex_unlock(&consumerMutex);
- return (void *) local_result;
- }
- while (stat != NEEDTOREAD) {
- pthread_cond_wait(&needToReadCond, &consumerMutex);
- }
- local_result += numb;
- stat = NEEDTOWRITE;
- pthread_cond_signal(&needToWriteCond);
- pthread_mutex_unlock(&consumerMutex);
- if (sleep > 0) {
- int sleep_millis = (rand() % sleep) + 1;
- UNUSED(sleep_millis);
- usleep(sleep_millis * 1000);
- }
- }
- pthread_mutex_lock(&consumerMutex);
- if(stat == NEEDTOREAD) {
- local_result += numb;
- stat = NEEDTOWRITE;
- }
- pthread_mutex_unlock(&consumerMutex);
- return (void *) local_result;
- }
- void* consumer_interruptor_routine(void* arg) {
- // wait for consumers to start
- // interrupt random consumer while producer is running
- UNUSED(arg);
- return nullptr;
- }
- int run_threads(long long int threadsCount, int waitCount) {
- // start N threads and wait until they're done
- // return aggregated sum of values
- long long int result = 0 ;
- if(threadsCount < 1) return 0;
- if(threadsCount == 0){
- return 0;
- }
- pthread_t producer, interrupter;
- UNUSED(interrupter);
- UNUSED(waitCount);
- vector <pthread_t> consumers;
- //int n=result.size();
- pthread_create( &producer, NULL, producer_routine, NULL);
- for (int i = 0; i < threadsCount; i++) {
- pthread_t consumer;
- pthread_create(&consumer, NULL, consumer_routine, &waitCount);
- consumers.push_back(consumer);
- }
- for (int i = 0; i < threadsCount; i++) {
- long long int partial = 0;
- pthread_join(consumers[i], (void **) &partial);
- result += partial;
- }
- pthread_join( producer, NULL);
- return result;
- }
- int get_tid() {
- // 1 to 3+N thread ID
- return 0;
- }
- int init(){
- pthread_mutex_init(&numbMutex, NULL);
- pthread_mutex_init(&consumerMutex, NULL);
- pthread_cond_init(&needToReadCond, NULL);
- pthread_cond_init(&needToWriteCond, NULL);
- return 0;
- }
- int end(){
- pthread_mutex_destroy(&numbMutex);
- pthread_mutex_destroy(&consumerMutex);
- pthread_cond_destroy(&needToReadCond);
- pthread_cond_destroy(&needToWriteCond);
- return 0;
- }
- int main(int argc, char *argv[]) {
- srand(time(NULL));
- init();
- long long int threadsCount = atoll(argv[1]);
- int waitCount = atoi(argv[2]);
- if(argc > 3){
- if(strcmp(argv[3],"–debug"))
- isDebug = 1;
- }
- std::cout << run_threads(threadsCount, waitCount) << std::endl;
- end();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement