Advertisement
weslleylc

K-means Parallel

Nov 25th, 2014
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.75 KB | None | 0 0
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <time.h>
  4. #include <math.h>
  5. #include "mpi.h"
  6. #define MASTER 0
  7. /**
  8. * Estrutura de ponto
  9. */
  10. typedef struct
  11. {
  12. double _x;
  13. double _y;
  14. } Point;
  15. /**
  16. decique o centroid que esta mais perto de um determinado ponto
  17. */
  18. int getParent(Point point,Point* centroids,int num_centroids)
  19. {
  20. int daddy=0;
  21. double distance=0;
  22. double minDistance=distancePoints(point,centroids[0]);
  23. int dex;
  24. #pragma omp parallel for
  25. for(dex=1;dex<num_centroids;dex++)
  26. {
  27. distance=distancePoints(point,centroids[dex]);
  28. if(minDistance>=distance)
  29. {
  30. daddy=dex;
  31. minDistance=distance;
  32. }
  33. }
  34. return daddy;
  35. }
  36. /**
  37.  
  38. calcula os novos centroids após receber o array de centroids de cada processo escravo
  39. */
  40. void NewCentroids(Point* points,int* data,Point* centroids,int num_clusters,int num_points)
  41. {
  42. Point* newCentroids=malloc(sizeof(Point)*num_clusters);
  43. int* population=malloc(sizeof(int)*num_clusters);
  44. int dex;
  45.  
  46. for(dex=0;dex<num_clusters;dex++)
  47. {
  48. population[dex]=0;
  49. newCentroids[dex]._x=0;
  50. newCentroids[dex]._y=0;
  51. }
  52. for(dex=0;dex<num_points;dex++)
  53. {
  54. population[data[dex]]++;
  55. newCentroids[data[dex]]._x+=points[dex]._x;
  56. newCentroids[data[dex]]._y+=points[dex]._y;
  57. }
  58. for(dex=0;dex<num_clusters;dex++)
  59. {
  60. if(population[dex]!=0.0)
  61. {
  62. newCentroids[dex]._x/=population[dex];
  63. newCentroids[dex]._y/=population[dex];
  64. }
  65. }
  66. for(dex=0;dex<num_clusters;dex++)
  67. {
  68. centroids[dex]._x=newCentroids[dex]._x;
  69. centroids[dex]._y=newCentroids[dex]._y;
  70. }
  71. }
  72. /**
  73. checa se convergiu
  74. */
  75. int Convergence(int *former_clusters,int *latter_clusters,int num_points)
  76. {
  77. int dex;
  78. for(dex=0;dex<num_points;dex++)
  79. if(former_clusters[dex]!=latter_clusters[dex])
  80. return -1;
  81. return 0;
  82. }
  83. /**
  84. calcula a distancia entre dois pontos
  85. */
  86. double distancePoints(Point point1,Point point2)
  87. {
  88. return (pow((point1._x-point2._x)*100,2)+pow((point1._y-point2._y)*100,2));
  89. }
  90. /**
  91. Função responsável por ler os pontos
  92. */
  93. void readPoints(FILE* input,Point *points,int num_points)
  94. {
  95. int dex;
  96. for(dex=0;dex<num_points;dex++)
  97. {
  98. fscanf(input,"%lf,%lf",&points[dex]._x,&points[dex]._y);
  99. }
  100. }
  101. /**
  102. inicializa os centroids randomicamente
  103. */
  104. void initialize(Point* centroids,int num_clusters)
  105. {
  106. int dex;
  107. srand(time(NULL));
  108. for(dex=0;dex<num_clusters;dex++)
  109. {
  110. centroids[dex]._x=((double)(rand()%1000))/1000;
  111. centroids[dex]._y=((double)(2*rand()%1000))/1000;
  112. }
  113. }
  114. /**
  115. Função responsável por ler o arquivo que contem o numero de pontos e o numero de clusteres
  116. */
  117. void read(FILE *input,int* num_clusters,int* num_points)
  118. {
  119. fscanf(input,"%d\n",num_clusters);
  120. printf("%d\n",*num_clusters);
  121.  
  122. fscanf(input,"%d\n",num_points);
  123. printf("%d\n",*num_points);
  124. }
  125. /**
  126. seta o array de clusteres com o valor -1
  127. */
  128. int resetData(int *data,int num_points)
  129. {
  130. int dex;
  131. for(dex=0;dex<num_points;dex++)
  132. {
  133. data[dex]=-1;
  134. }
  135. }
  136.  
  137. /**
  138. main, divide entre processos escravos e mestre
  139. */
  140. int main(int argc, char* argv[])
  141. {
  142. int rank;
  143. int size;
  144. int num_clusters;
  145. int num_points;
  146. int dex;
  147. int job_size;
  148. int job_done=0;
  149.  
  150. Point* centroids;
  151. Point* points;
  152. Point* received_points;
  153. int * slave_clusters;
  154. int * former_clusters;
  155. int * latter_clusters;
  156.  
  157. MPI_Init(&argc, &argv);
  158.  
  159. MPI_Status status;
  160.  
  161. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  162. MPI_Comm_size(MPI_COMM_WORLD, &size);
  163.  
  164. //Estrutura MPI
  165. MPI_Datatype MPI_POINT;
  166. MPI_Datatype type=MPI_DOUBLE;
  167. int blocklen=2;
  168. MPI_Aint disp=0;
  169. MPI_Type_create_struct(1,&blocklen,&disp,&type,&MPI_POINT);
  170. MPI_Type_commit(&MPI_POINT);
  171.  
  172. /******** Processo Master******************************************************/
  173.  
  174. if(rank==MASTER)
  175. {
  176. //leitura de arquivo
  177. FILE *input;
  178. input=fopen(argv[1],"r");
  179. read(input,&num_clusters,&num_points);
  180. points=(Point*)malloc(sizeof(Point)*num_points);
  181. readPoints(input,points,num_points);
  182. fclose(input);
  183.  
  184. //Alocação de memoria para os clusters
  185. former_clusters=(int*)malloc(sizeof(int)*num_points);
  186. latter_clusters=(int*)malloc(sizeof(int)*num_points);
  187. job_size=num_points/(size-1);
  188. centroids=malloc(sizeof(Point)*num_clusters);
  189.  
  190. //resetando a memoria dos clusteres ja usados
  191. initialize(centroids,num_clusters);
  192. resetData(former_clusters,num_points);
  193. resetData(latter_clusters,num_points);
  194.  
  195. //envia data para os processos escravos
  196. for(dex=1;dex<size;dex++)
  197. {
  198. printf("Sending to [%d]\n",dex);
  199. MPI_Send(&job_size ,1 , MPI_INT ,dex,0,MPI_COMM_WORLD);
  200. MPI_Send(&num_clusters ,1 , MPI_INT ,dex,0,MPI_COMM_WORLD);
  201. MPI_Send(centroids ,num_clusters, MPI_POINT ,dex,0,MPI_COMM_WORLD);
  202. MPI_Send(points+(dex-1)*job_size,job_size , MPI_POINT ,dex,0,MPI_COMM_WORLD);
  203. }
  204. printf("Sent!\n");
  205.  
  206. MPI_Barrier(MPI_COMM_WORLD);
  207.  
  208. //parte principal do processamento
  209. while(1)
  210. {
  211. MPI_Barrier(MPI_COMM_WORLD);
  212.  
  213. printf("Master recebendo\n");
  214. for(dex=1;dex<size;dex++)
  215. MPI_Recv(latter_clusters+(job_size*(dex-1)),job_size,MPI_INT,dex,0,MPI_COMM_WORLD,&status);
  216.  
  217. printf("Master recebeu\n");
  218.  
  219. NewCentroids(points,latter_clusters,centroids,num_clusters,num_points);
  220. printf("Novos Centroids estão prontos!\n");
  221. if(Convergence(latter_clusters,former_clusters,num_points)==0)
  222. {
  223. printf("Convergido!\n");
  224. job_done=1;
  225. }
  226. else
  227. {
  228. printf("Não Convergido!\n");
  229. for(dex=0;dex<num_points;dex++)
  230. former_clusters[dex]=latter_clusters[dex];
  231. }
  232.  
  233. //informa os slaves que eles não tem mais trabalho
  234. for(dex=1;dex<size;dex++)
  235. MPI_Send(&job_done,1, MPI_INT,dex,0,MPI_COMM_WORLD);
  236.  
  237. MPI_Barrier(MPI_COMM_WORLD);
  238. if(job_done==1)
  239. break;
  240.  
  241. //envia os centroids mais recentes
  242. for(dex=1;dex<size;dex++)
  243. MPI_Send(centroids,num_clusters, MPI_POINT,dex,0, MPI_COMM_WORLD);
  244.  
  245. MPI_Barrier(MPI_COMM_WORLD);
  246. }
  247.  
  248. //saida do arquivo
  249. FILE* output=fopen(argv[2],"w");
  250. fprintf(output,"%d\n",num_clusters);
  251. fprintf(output,"%d\n",num_points);
  252. for(dex=0;dex<num_clusters;dex++)
  253. fprintf(output,"%lf,%lf\n",centroids[dex]._x,centroids[dex]._y);
  254. for(dex=0;dex<num_points;dex++)
  255. fprintf(output,"%lf,%lf,%d\n",points[dex]._x,points[dex]._y,latter_clusters[dex]+1);
  256. fclose(output);
  257. }
  258. /*************Procesos escravos trabalham aqui ************************/
  259. else
  260. {
  261. //recebendo data
  262. printf("Recebendo\n");
  263. MPI_Recv(&job_size ,1 ,MPI_INT ,MASTER,0,MPI_COMM_WORLD,&status);
  264. MPI_Recv(&num_clusters,1 ,MPI_INT ,MASTER,0,MPI_COMM_WORLD,&status);
  265. centroids=malloc(sizeof(Point)*num_clusters);
  266. MPI_Recv(centroids ,num_clusters,MPI_POINT,MASTER,0,MPI_COMM_WORLD,&status);
  267. printf("part_size =%d\n",job_size);
  268. received_points=(Point*)malloc(sizeof(Point)*job_size);
  269. slave_clusters=(int*)malloc(sizeof(int)*job_size);
  270. MPI_Recv(received_points,job_size,MPI_POINT ,MASTER,0,MPI_COMM_WORLD,&status);
  271. printf("Recebido [%d]\n",rank);
  272.  
  273. MPI_Barrier(MPI_COMM_WORLD);
  274.  
  275. while(1)
  276. {
  277. printf("Calculando novo cluster [%d]\n",rank);
  278. for(dex=0;dex<job_size;dex++)
  279. {
  280. slave_clusters[dex]=getParent(received_points[dex],centroids,num_clusters);
  281. }
  282.  
  283. printf("enviando ao Master [%d]\n",rank);
  284. MPI_Send(slave_clusters,job_size, MPI_INT,MASTER, 0, MPI_COMM_WORLD);
  285. MPI_Barrier(MPI_COMM_WORLD);
  286. MPI_Barrier(MPI_COMM_WORLD);
  287. MPI_Recv(&job_done,1, MPI_INT,MASTER,0,MPI_COMM_WORLD,&status);
  288.  
  289. if(job_done==1) //trabalho finalizado
  290. break;
  291.  
  292. //recebendo os centroids do processo Master
  293. MPI_Recv(centroids,num_clusters,MPI_POINT,MASTER,0, MPI_COMM_WORLD,&status);
  294.  
  295. MPI_Barrier(MPI_COMM_WORLD);
  296. }
  297. }
  298. //finaliza tudo
  299. MPI_Finalize();
  300. return 0;
  301. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement