Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdlib.h>
- #include <stdio.h>
- #include <mpi.h>
- /* Variable time out */
- double ErrorTime = 0.5;
- /* Structure des processus */
- typedef struct
- {
- int state; // 1 en cours, 0 en attente, -1 erreur + valeur non reprise, -2 erreur + valeur reprise 3 envoi en cours
- double startTime;
- int result;
- int firstValue;
- int endValue;
- }currentState;
- /* Initialisation des différents champs de la structure currentState */
- void init_Struct(currentState tab[], int size)
- {
- int i;
- for(i = 0; i < size-1; i++)
- {
- /*if(i==3)
- tab[i].state = -1;
- else*/
- tab[i].state = 0;
- tab[i].firstValue = 0;
- tab[i].endValue = 0;
- tab[i].startTime = 0;
- tab[i].result = 0;
- }
- }
- /* Vérifie si les différents processus n'ont pas de "time out"*/
- int checkAvailabilityProcess(currentState tab[], int process)
- {
- //int i;
- double currentTime = MPI_Wtime();
- double elapsedTime=0;
- if(( tab[process].startTime - currentTime ) > ErrorTime )
- {
- printf("ERROR on process %d !\n",process);
- tab[process].state = -1;
- return 0;
- }
- return 1;
- }
- /* Renvoi un processus dont l'état est en erreur mais donc la portion n'as pas été calculer, -1 si vide */
- int checkProcessFail(currentState tab[], int size)
- {
- int i;
- for(i = 0 ; i < size-1 ; i++)
- {
- if(tab[i].state == -1){
- tab[i].state = -2;
- return i;
- }
- }
- return -1;
- }
- /* Affichage de l'etat des processus */
- void displayCurrentState(currentState tab[], int size)
- {
- int i;
- for(i = 0; i < size-1; i++)
- {
- printf("Processus fils n°%d -- Etat : %d\n",i,tab[i].state);
- }
- }
- void wasteTime(unsigned long ms)
- {
- unsigned long t,t0;
- struct timeval tv;
- gettimeofday(&tv,(struct timezone *)0);
- t0=tv.tv_sec*1000LU+tv.tv_usec/1000LU;
- do
- {
- gettimeofday(&tv,(struct timezone *)0);
- t=tv.tv_sec*1000LU+tv.tv_usec/1000LU;
- } while(t-t0<ms);
- }
- int main(int argc, char ** argv)
- {
- int size, rank, operations,returnedFlag, endValue, firstValue, sourceProcess, totalResult, returnedValue, flag, split, i;
- int position, returnedStatus;
- double currentTime;
- MPI_Request req;
- MPI_Status status;
- /* Initialisation */
- MPI_Init(&argc, &argv);
- /* Determination rang processus */
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- /* Determination taille */
- MPI_Comm_size(MPI_COMM_WORLD, &size);
- /* Nombre de calculs */
- /* Tableau d'etat de chaque processus */
- currentState tabState[size-1];
- operations = 10;
- /* Variable résultat final */
- returnedValue = 0;
- /* Variable status du MPI_TAG */
- flag = 0;
- /* Traitement du processus pere */
- if(rank == 0)
- {
- /* Initialisation des variables de la structure */
- init_Struct(tabState, size);
- /* Chaque processus recevra les valeurs 2 par 2 */
- split = 2;
- /* Initialisation boucle */
- i = 1;
- firstValue = 1;
- /* Tant que i est inférieur a la taille et que l'on ne depasse pas le nombre de tranches a distribuer */
- while( i < size && firstValue < operations )
- {
- /* on envoit la valeur du début de tranche a calculer au processus i */
- if(tabState[i-1].state == 0){
- //printf("Sending first Value %d to %d\n",firstValue, i);
- /* On met jour endValue afin de vérifier si l'on a pas dépasser le nombre de tranches a distribuer */
- endValue=firstValue+split;
- if(endValue>operations)
- endValue=operations;
- printf("\nSending [%d-%d] to %d\n",firstValue,endValue, i);
- /* On envoit la valeur de endValue de tranche a calculer au processus i */
- MPI_Isend(&firstValue, 1, MPI_INT, i, 0, MPI_COMM_WORLD,&req);
- MPI_Test(&req,&returnedFlag,&status);
- while(!returnedFlag)
- MPI_Test(&req,&returnedFlag,&status);
- MPI_Isend(&endValue, 1, MPI_INT, i, 0, MPI_COMM_WORLD,&req);
- MPI_Test(&req,&returnedFlag,&status);
- while(!returnedFlag)
- MPI_Test(&req,&returnedFlag,&status);
- /* Mise a jour des valeurs du processus traité */
- tabState[i-1].startTime = MPI_Wtime();
- tabState[i-1].state = 1;
- tabState[i-1].firstValue = firstValue;
- tabState[i-1].endValue = endValue;
- /* On met a jour firstValue pour que le prochain processus traite la tranche suivante */
- firstValue=endValue+1;
- /* On incrémente l'id du processus */
- }
- i++;
- }
- /* On décrémente i pour pointer sur le dernier processus auxquel on a fournis les données de calculs */
- i--;
- /* Tant qu'on a pas traiter tout les processus */
- while(i>0)
- {
- /* On receptionne un message fourni par n'importe lequel des processus fils */
- printf("%d\n",i);
- returnedStatus = MPI_Irecv(&returnedValue, 1, MPI_INT, i, MPI_ANY_TAG, MPI_COMM_WORLD, &req);
- MPI_Test(&req, &returnedFlag, &status);
- while(!returnedFlag || !checkAvailabilityProcess(tabState, size)){
- MPI_Test(&req, &returnedFlag, &status);
- }
- printf("\nReceiving from %d ",sourceProcess);
- if (returnedStatus != MPI_SUCCESS){
- tabState[sourceProcess].state = -1;
- sourceProcess=status.MPI_SOURCE-1;
- tabState[sourceProcess].result = returnedValue;
- }
- else
- tabState[sourceProcess].state = 0;
- returnedValue = 0;
- if(tabState[sourceProcess].state == 0){
- /* On récupére le résultat et on l'additionne au total */
- totalResult+=tabState[sourceProcess].result;
- printf("Value : %d\n\n",tabState[sourceProcess].result);
- }
- i--;
- /* Si il reste encore des portions a calculer */
- printf("%d, %d, %d\n",firstValue, operations, i);
- if(firstValue <= operations)
- {
- printf("Pending operation... \n");
- position = checkProcessFail(tabState, size);
- if(position == -1){
- MPI_Isend(&firstValue, 1, MPI_INT, sourceProcess, 0, MPI_COMM_WORLD,&req);
- tabState[sourceProcess].state = 1;
- endValue = firstValue+split-1;
- if(endValue>operations)
- endValue = operations;
- MPI_Isend(&endValue, 1, MPI_INT, sourceProcess, 0, MPI_COMM_WORLD,&req);
- printf("Sending [%d - %d] to %d\n",firstValue,endValue, i);
- firstValue = endValue+1;
- i++;
- }else
- {
- printf("!!##!! Error from process %d Redirecting to process %d \n", position, sourceProcess);
- printf("firstValue : %d, endValue : %d\n",tabState[position+1].firstValue,tabState[position+1].endValue);
- MPI_Isend(&tabState[position+1].firstValue, 1, MPI_INT, sourceProcess, 0, MPI_COMM_WORLD,&req);
- MPI_Isend(&tabState[position+1].endValue, 1, MPI_INT, sourceProcess, 0, MPI_COMM_WORLD,&req);
- }
- }
- }
- /* Affichage resultat final */
- printf("#Final result : %d\n",totalResult);
- /* On envoi a tout les programmes fils un signal de fin ( içi 1 ) */
- for(i=1;i<size;i++)
- {
- MPI_Send(&firstValue, 1, MPI_INT, i, 1, MPI_COMM_WORLD);
- }
- }
- else /* Traitement des processus fils */
- {
- while(flag!=1) /* on ne sort de la boucle que lorsque le processus maitre envoie le dernier Send avec le TAG = 1 */
- {
- /* Récupération valeur de début de la portion de calcul */
- MPI_Recv(&firstValue, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
- flag=status.MPI_TAG; /* on récupère le tag du message */
- if(flag==0)/* Si le tag est égale a zéro, cela veut dire que le père demande un calcul */
- {
- /* Récupération valeur de fin de la portion de calcul */
- MPI_Recv(&endValue, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
- /* Init var */
- totalResult=0;
- /* On boucle sur la portion ( calcul des carrés de firstValue a endValue */
- for(i=firstValue;i<=endValue;i++)
- {
- totalResult=totalResult+(i*i);
- }
- printf("Operation : %d to %d result = %d\n",firstValue,endValue,totalResult);
- /* On renvoi le résultat au processus 0 */
- MPI_Send(&totalResult, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
- }
- }
- }
- /* Terminaison MPI */
- MPI_Finalize();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement