Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <bits/stdc++.h>
- #include <pthread.h>
- double *array;//the array to store data, length = n
- double *summleft;//the array to store the summ of the elements in
- //the progression which touches left border
- //of array part which belongs to a certain thread, length = p
- double *summright;//the array to store the summ of the elements in
- //the progression which touches right border
- //of array part which belongs to a certain thread, length = p
- int *numleft;//the array to store the amount of the elements in
- //the progression which touches left border
- //of array part which belongs to a certain thread, length = p
- int *numright;//the array to store the amount of the elements in
- //the progression which touches right border
- //of array part which belongs to a certain thread, length = p
- bool *isallthrough;//the array to check whether all elements of the array
- //part belonging to a certain thread are in one progression, length = p
- typedef struct _ARGS
- {
- int length;
- int thread_num;
- int total_threads;
- int startpos;
- int thread_length;
- double right1;
- double right2;
- double left1;
- double left2;
- } ARGS;
- void change_numbers(int length, int thread_num, int total_threads, int startpos, int thread_length, double right1, double right2, double left1, double left2);
- void *change_numbers_threaded(void* pa);
- void synchronize (int total_threads);
- void synchronize(int total_threads)
- {
- static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- static pthread_cond_t condvar_in = PTHREAD_COND_INITIALIZER;
- static pthread_cond_t condvar_out = PTHREAD_COND_INITIALIZER;
- static int threads_in = 0;
- static int threads_out = 0;
- pthread_mutex_lock(&mutex);
- threads_in++;
- if (threads_in >= total_threads)
- {
- threads_out = 0;
- pthread_cond_broadcast(&condvar_in);
- }
- else
- {
- while (threads_in < total_threads)
- {
- pthread_cond_wait(&condvar_in, &mutex);
- }
- }
- threads_out++;
- if (threads_out >= total_threads)
- {
- threads_in = 0;
- pthread_cond_broadcast(&condvar_out);
- }
- else
- {
- while (threads_out < total_threads)
- {
- pthread_cond_wait(&condvar_out, &mutex);
- }
- }
- pthread_mutex_unlock(&mutex);
- }
- void *change_numbers_threaded(void* pa)
- {
- ARGS* pargs = (ARGS*)pa;
- printf("Thread %d started\n", pargs->thread_num);
- change_numbers(pargs->length, pargs->thread_num, pargs->total_threads, pargs->startpos, pargs->thread_length, pargs->right1, pargs->right2, pargs->left1, pargs->left2);
- printf("Thread %d finished.\n", pargs->thread_num);
- return 0;
- }
- int main(int argc, char** argv)
- {
- pthread_t *threads;
- ARGS* args;
- int nthreads;
- int length;
- double *rightnum1;//the element of the array after a certain thread, length = p
- double *rightnum2;//the 2nd element of the array after a certain thread, length = p
- double *leftnum1;//the element of the array before a certain thread, length = p
- double *leftnum2;//the 2nd element of the array before a certain thread, length = p
- double temp;
- int i,j;
- if (argc<3)
- {
- printf("Execute the program with: a.out length total_threads [input]\n");
- return 1;
- }
- nthreads=atoi(argv[2]);
- printf("The program will create %d threads.\n",nthreads);
- length=atoi(argv[1]);
- if(length<=0 || nthreads<=0)
- {
- printf("Cannot launch the program with these values for length and/or number of threads.\n");
- return 1;
- }
- if(length<nthreads)
- {
- printf("Several threads will not receive data. Lower the amount of threads.\n");
- return 1;
- }
- if(length<3)
- {
- printf("There cannot be any progressions in the array of less than 3 elements.\n");
- return 1;
- }
- if(nthreads*3>length)
- {
- printf("There will be thread processing less than 3 numbers. This is not normal.\nIf you want to continue, press 0\nIf you want to change the amount of threads, press 1\n");
- scanf("%d\n",&j);
- if(j!=0) return 1;
- }
- array=new double[length];
- summleft=new double[nthreads];
- summright=new double[nthreads];
- numleft=new int[nthreads];
- numright=new int[nthreads];
- isallthrough=new bool[nthreads];
- threads=new pthread_t[nthreads];
- args=new ARGS[nthreads];
- if(!array || !summleft || !summright || !numleft || !numright || !isallthrough || !threads || !args)
- {
- if(array) delete[] array;
- if(summleft) delete[] summleft;
- if(summright) delete[] summright;
- if(numleft) delete[] numleft;
- if(numright) delete[] numright;
- if(isallthrough) delete[] isallthrough;
- if(threads) delete[] threads;
- if(args) delete[] args;
- printf("Not enough memory.\n");
- return -1;
- }
- for(i=0;i<nthreads;i++)
- {
- summleft[i]=0;
- summright[i]=0;
- numleft[i]=0;
- numright[i]=0;
- isallthrough[i]=false;
- }
- if(argc==3) for(i=0;i<length;i++) array[i]=i;
- else
- {
- FILE* fin;
- fin=fopen(argv[3], "r");
- if(!fin)
- {
- printf("File %s cannot be found and/or opened.\n",argv[3]);
- delete[] array;
- delete[] summleft;
- delete[] summright;
- delete[] numleft;
- delete[] numright;
- delete[] isallthrough;
- delete[] threads;
- delete[] args;
- return 1;
- }
- for(i=0;i<length;i++)
- {
- if(!fscanf(fin,"%lf",&temp))
- {
- printf("File %s contains less than %d numbers.\n",argv[3],length);
- fclose(fin);
- delete[] array;
- delete[] summleft;
- delete[] summright;
- delete[] numleft;
- delete[] numright;
- delete[] isallthrough;
- delete[] threads;
- delete[] args;
- return 2;
- }
- array[i]=temp;
- }
- if (fscanf(fin, "%lf", &temp) != EOF)
- {
- printf("File %s contains corrupted data.\n",argv[3]);
- fclose(fin);
- delete[] array;
- delete[] summleft;
- delete[] summright;
- delete[] numleft;
- delete[] numright;
- delete[] isallthrough;
- delete[] threads;
- delete[] args;
- return 3;
- }
- fclose(fin);
- }
- printf("Input array:\n");
- for(i=0;i<length;i++) printf("%f\t",array[i]);
- printf("\n");
- for(i=0;i<nthreads;i++)
- {
- args[i].thread_num=i;
- args[i].total_threads=nthreads;
- args[i].length=length;
- args[i].startpos=(int)(length/nthreads)*i;
- if(i<=length%nthreads) args[i].startpos+=i;
- else args[i].startpos+=length%nthreads;
- args[i].thread_length=length/nthreads;
- if(i<length%nthreads) args[i].thread_length++;
- if(i<nthreads-1) args[i].right1=array[args[i].startpos+args[i].thread_length];
- if(args[i].startpos+args[i].thread_length<length-1) args[i].right2=array[args[i].startpos+args[i].thread_length+1];
- if(i>0) args[i].left1=array[args[i].startpos-1];
- if(args[i].startpos-1>0) args[i].left2=array[args[i].startpos-2];
- }
- for (i=0;i<nthreads;i++)
- {
- if(pthread_create(threads+i,0,change_numbers_threaded,args+i))
- {
- fprintf(stderr, "Cannot create thread #%d!", i);
- delete[] array;
- delete[] summleft;
- delete[] summright;
- delete[] numleft;
- delete[] numright;
- delete[] isallthrough;
- delete[] threads;
- delete[] args;
- return 10;
- }
- }
- for (i=0;i<nthreads;i++)
- {
- if (pthread_join(threads[i],0)) fprintf(stderr, "cannot wait thread #%d!", i);
- }
- printf("Output array:\n");
- for(i=0;i<length;i++) printf("%f\t",array[i]);
- printf("\n");
- delete[] array;
- delete[] summleft;
- delete[] summright;
- delete[] numleft;
- delete[] numright;
- delete[] isallthrough;
- delete[] threads;
- delete[] args;
- return 0;
- }
- void change_numbers(int length, int thread_num, int total_threads, int startpos, int thread_length, double right1, double right2, double left1, double left2)
- {
- int i,j=0;
- bool waitleft=false, waitright=false;
- int num=0;
- double average=0;
- printf("Thread %d works\n",thread_num);
- //printf("It received length=%d thread_num=%d total_threads=%d startpos=%d\n",length,thread_num, total_threads,startpos);
- //printf("It received right1=%f right2=%f left1=%f left2=%f\n",right1,right2,left1,left2);
- //printf("It received thread_length=%d\n",thread_length);
- if(thread_length>=3)//this part assumes that the length of this part is 3 elements or more.
- //NORMALLY this part has to work always
- {
- if(fabs(array[startpos]-2*array[startpos+1]+array[startpos+2])<1e-9)//there is a progression touching left border
- {
- printf("Thread %d found a progression touching left border\n",thread_num);
- if(thread_num>0)
- {
- waitleft=true;
- printf("Thread %d waits left\n",thread_num);
- }
- for(i=0;i<thread_length;i++)//checked how many numbers in this part belong to
- //this progression, add this to border numbers.
- {
- if(fabs(array[startpos]-2*array[startpos+1]+array[startpos+2])>1e-9) break;
- summleft[thread_num]+=array[startpos+i];
- numleft[thread_num]++;
- }//the loop has ended but we need to add the last 2 manually
- //i has increased by 1 since the last addition, keep that in mind
- if(i<thread_length-2)
- {
- summleft[thread_num]+=array[startpos+i]+array[startpos+i+1];
- numleft[thread_num]+=2;
- }
- else if(i=thread_length-1)
- {
- summleft[thread_num]+=array[startpos+i];
- numleft[thread_num]++;
- }
- printf("Thread %d found %d elements in 1st progression with summ of %f\n",thread_num, numleft[thread_num],summleft[thread_num]);
- if(numleft[thread_num]==thread_length)//all array part belongs to the same progression
- {
- isallthrough[thread_num]=true;
- summright[thread_num]=summleft[thread_num];
- numright[thread_num]=numleft[thread_num];
- if(thread_num<total_threads-1) waitright=true;
- }
- }
- if(fabs(array[startpos+thread_length-3]-2*array[startpos+thread_length-2]+array[startpos+thread_length-1])<1e-9 && !waitright)
- //there is a progression touching right border which does not go through all array part
- {
- if(thread_num<total_threads-1) waitright=true;
- for(i=thread_length;i>=numleft[thread_num];i--)
- //by default numleft[i]=0, if it is bigger, there are some numbers belonging to different progression
- //and these numbers can be skipped
- {
- if(fabs(array[startpos+thread_length-3]-2*array[startpos+thread_length-2]+array[startpos+thread_length-1])>1e-9) break;
- summright[thread_num]+=array[startpos+thread_length+i];
- numright[thread_num]++;
- }//the loop has ended but we need to add the last 2 manually
- //i has decreased by 1 since the last addition, keep that in mind
- summright[thread_num]+=array[startpos+thread_length+i]+array[startpos+thread_length+i-1];
- numright[thread_num]+=2;
- }
- }
- if(thread_length>=2)//this part assumes that there are at least 2 elements in this part of the array
- {
- if(thread_num!=0)
- {
- if(fabs(left1-2*array[startpos]+array[startpos+1])<1e-9)
- {
- if(!waitleft)//0|1,2,5,...|
- {
- summleft[thread_num]=array[startpos]+array[startpos+1];
- numleft[thread_num]=2;
- waitleft=true;
- }
- }
- }
- if(thread_num<total_threads-1)
- {
- if(fabs(array[startpos+thread_length-2]-2*array[startpos+thread_length-1]+right1)<1e-9)
- {
- if(!waitright)//|...,0,3,4|5
- {
- summright[thread_num]=array[startpos+thread_length-2]+array[startpos+thread_length-1];
- numright[thread_num]=2;
- waitright=true;
- }
- }
- }
- }
- //the next part will work even if the length of the part of the array is 1
- if(thread_num>0 && startpos-2>=0 && !waitleft)
- {
- if(fabs(left2-2*left1+array[startpos])<1e-9)//0,1|2,4,...|
- {
- summleft[thread_num]=array[startpos];
- numleft[thread_num]=1;
- waitleft=true;
- }
- }
- if(thread_num<total_threads-1 && startpos+thread_length+1<length && !waitright)
- {
- if(fabs(array[startpos+thread_length-1]-2*right1+right2)<1e-9)//|...,10,3|2,1
- {
- summright[thread_num]=array[startpos+thread_length-1];
- numright[thread_num]=1;
- waitright=true;
- }
- }
- //what is yet to do is to check for isallthrough for ultrasmall array part sizes
- if(thread_length==2 && thread_num>0 && thread_num<total_threads-1)
- {
- if(fabs(left1+right1-array[startpos]-array[startpos+1])<1e-9)//0|1,2|3
- {
- isallthrough[thread_num]=true;
- }
- }
- if(thread_length==1 && thread_num>0 && thread_num<total_threads-1)
- {
- if(fabs(left1+right1-2*array[startpos])<1e-9)//0|1|2
- {
- isallthrough[thread_num]=true;
- }
- }
- //The first part is finished. What is done:
- //If there is a progression touching left side or right side
- //of each array part, all of the threads will be able to know how many
- //numbers are in this progression, what is their summ and whether
- //this progression passes through all part of the array.
- //These elements require sync point to be changed properly. So now they are skipped.
- //We can still process the central part of the array part if it contains any
- //progressions which do not touch sides.
- if(thread_length>=3)
- {
- for(i=numleft[thread_num];i<thread_length-numright[thread_num]-2;i++)
- {
- if(fabs(array[startpos+i]-2*array[startpos+1+i]+array[startpos+2+i])<1e-9)
- {
- while(fabs(array[startpos+i+num]-2*array[startpos+i+1+num]+array[startpos+i+2+num])<1e-9)
- {
- average+=array[startpos+i+num];
- num++;
- }
- average+=array[startpos+i+num]+array[startpos+i+num+1];
- num+=2;
- //we have found a progression from startpos+i to startpos+i+num
- average/=num;
- for(j=i;j<num+i;j++) array[startpos+j]=average;
- }
- average=0;
- num=0;
- i+=num;//we skip this progression so we don't work with it again
- }
- }
- //What is only left is to process edges, we need a sync point for this.
- synchronize(total_threads);
- //After this sync point these arrays will never change:
- //summleft, summright, numleft, numright, isallthrough
- //This means that we can access them fully without any need of mutex.
- if(waitleft)
- {
- average=summright[thread_num-1];
- num=numright[thread_num-1];
- i=1;
- while(isallthrough[thread_num-i])
- {
- average+=summright[thread_num-i-1];
- num+=numright[thread_num-i-1];
- i++;
- }
- for(i=0;i<numleft[thread_num];i++)
- {
- average+=array[startpos+i];
- num++;
- }
- average/=num;
- for(i=0;i<numleft[thread_num];i++) array[startpos+i]=average;
- }
- if(waitright && !waitleft)
- {
- average=summleft[thread_num+1];
- num=numleft[thread_num+1];
- i=1;
- while(isallthrough[thread_num+i])
- {
- average+=summleft[thread_num+i+1];
- num+=numleft[thread_num+i+1];
- i++;
- }
- for(i=0;i<numright[thread_num];i++)
- {
- average+=array[startpos+thread_length-1-i];
- num++;
- }
- average/=num;
- for(i=0;i<numright[thread_num];i++) array[startpos+thread_length-1-i]=average;
- }
- synchronize(total_threads);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement