Advertisement
Guest User

Untitled

a guest
May 25th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.62 KB | None | 0 0
  1. #include <iostream>
  2. #include <mpi.h>
  3. #include <cmath>
  4. #include <ctime>
  5. #include <cstdarg>
  6. #include <unistd.h>
  7. #include <vector>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10.  
  11. #define CALC_THREAD 0
  12. #define DATA_THREAD 1
  13. #define COUNT_ITER 10
  14. #define COUNT_TASKS 10000
  15. #define DEFAULT_WEIGHT 10000
  16. #define EXTRA_WEIGHT 2000
  17.  
  18. struct Task {
  19. int repeatNum;
  20. };
  21.  
  22. pthread_mutex_t mutex;
  23. pthread_t threads[2];
  24. std::vector<Task> tasks;
  25. MPI_Datatype TASK_INFO_TYPE;
  26. int rank;
  27. int size;
  28. int currentIter;
  29. int currentTask;
  30. int doneWeight = 0;
  31. int getNewTask(int procRank);
  32.  
  33. double doTask(Task &task) {
  34. double res = 0;
  35. for (int i = 0; i < task.repeatNum; i++) {
  36. res += sqrt(i*sqrt(i*i*i*sqrt(i)));
  37. }
  38. doneWeight += task.repeatNum / 1000;
  39. return res;
  40. }
  41.  
  42. void generateTasks(std::vector<Task> &tasks, int countTasks, int currentIter) {
  43. int weight = std::abs(rank - (currentIter % size));
  44. printf("Process: %d (thread %d). Generate %d tasks with weight %d\n", rank, CALC_THREAD, countTasks, weight);
  45. pthread_mutex_lock(&mutex);
  46. tasks.clear();
  47. for (int i = 0; i < countTasks; i++) {
  48. Task task;
  49. task.repeatNum = DEFAULT_WEIGHT + EXTRA_WEIGHT * weight;
  50. tasks.push_back(task);
  51. }
  52. pthread_mutex_unlock(&mutex);
  53. }
  54.  
  55. void reset(int *arr) {
  56. for (int i = 0; i < size; i++) {
  57. arr[i] = 1;
  58. }
  59. arr[rank] = 0;
  60. }
  61.  
  62. void *calcThread(void *args) {
  63. int *otherProcesses = new int[size];
  64. for (currentIter = 0; currentIter < COUNT_ITER; currentIter++) {
  65. printf("Process: %d (thread %d). Start iteration %d\n", rank, CALC_THREAD, currentIter);
  66. printf("Process: %d (thread %d). Generate tasks\n", rank, CALC_THREAD);
  67.  
  68. int count = COUNT_TASKS / size;
  69. if (rank >= size - (COUNT_TASKS % size)) {
  70. count++;
  71. }
  72. generateTasks(tasks, count, currentIter);
  73.  
  74. reset(otherProcesses);
  75.  
  76. currentTask = 0;
  77.  
  78. int i = 0;
  79. int askRank;
  80. while (true) {
  81. pthread_mutex_lock(&mutex);
  82. while (currentTask < tasks.size()) {
  83. Task taskToDo = tasks[currentTask];
  84. pthread_mutex_unlock(&mutex);
  85.  
  86. double result = doTask(taskToDo);
  87.  
  88. printf("Process: %d (thread %d). Finish task %d, result: %lf\n",
  89. rank,
  90. CALC_THREAD,
  91. currentTask,
  92. result);
  93.  
  94. currentTask++;
  95. pthread_mutex_lock(&mutex);
  96. }
  97. pthread_mutex_unlock(&mutex);
  98.  
  99. for (; i < size;) {
  100. askRank = (rank + i) % size;
  101. if (!otherProcesses[askRank]) {
  102. i++;
  103. continue;
  104. } else {
  105. otherProcesses[askRank] = getNewTask(askRank);
  106. break;
  107. }
  108. }
  109. if (i == size) {
  110. break;
  111. }
  112. }
  113.  
  114. printf("Process: %d (thread %d). End iteration %d, wait all\n", rank, CALC_THREAD, currentIter);
  115. MPI_Barrier(MPI_COMM_WORLD);
  116. }
  117. printf("Process: %d (thread %d). Send end request\n", rank, CALC_THREAD);
  118. int req = 0;
  119.  
  120. MPI_Send(&req, 1, MPI_INT, rank, 1, MPI_COMM_WORLD);
  121.  
  122. printf("Process: %d (thread %d). End work\n", rank, CALC_THREAD);
  123. delete[] otherProcesses;
  124. return NULL;
  125. }
  126.  
  127. void *dataThread(void *args) {
  128. while (currentIter < COUNT_ITER) {
  129. MPI_Status status;
  130. int res;
  131.  
  132. MPI_Recv(&res, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
  133. if (res == 0) {
  134. printf("Process: %d (thread %d). Get end request from %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
  135. break;
  136. }
  137.  
  138. printf("Process: %d (thread %d). Get request from %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
  139.  
  140. pthread_mutex_lock(&mutex);
  141. if (currentTask >= tasks.size()) {
  142. pthread_mutex_unlock(&mutex);
  143. int answer = 0;
  144.  
  145. MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, 2, MPI_COMM_WORLD);
  146. printf("Process: %d (thread %d). I haven't tasks for %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
  147. } else {
  148. Task taskToSend = tasks.back();
  149. tasks.pop_back();
  150. pthread_mutex_unlock(&mutex);
  151. int answer = 1;
  152.  
  153. MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, 2, MPI_COMM_WORLD);
  154. MPI_Send(&taskToSend, 1, TASK_INFO_TYPE, status.MPI_SOURCE, 3, MPI_COMM_WORLD);
  155.  
  156. printf("Process: %d (thread %d). Send task to %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
  157. }
  158. }
  159.  
  160. printf("Process: %d (thread %d). %d weight done. End work\n", rank, DATA_THREAD, doneWeight);
  161. return NULL;
  162. }
  163.  
  164. int work() {
  165. pthread_attr_t attrs;
  166. pthread_attr_init(&attrs);
  167. pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE);
  168.  
  169. pthread_create(&threads[CALC_THREAD], &attrs, calcThread, NULL);
  170. pthread_create(&threads[DATA_THREAD], &attrs, dataThread, NULL);
  171.  
  172. pthread_attr_destroy(&attrs);
  173.  
  174. for (int i = 0; i < 2; i++) {
  175. if (pthread_join(threads[i], NULL) != 0) {
  176. perror("Cannot join a thread");
  177. return -1;
  178. }
  179. }
  180. return 0;
  181. }
  182.  
  183. void createType() {
  184. int count = 1;
  185. int blocklengths[] = {1};
  186. MPI_Datatype types[] = {MPI_INT};
  187. MPI_Aint displs = 0;
  188.  
  189. MPI_Type_create_struct(count, blocklengths, &displs, types, &TASK_INFO_TYPE);
  190. MPI_Type_commit(&TASK_INFO_TYPE);
  191. }
  192.  
  193. void deleteType() {
  194. MPI_Type_free(&TASK_INFO_TYPE);
  195. }
  196.  
  197. int getNewTask(int procRank) {
  198. printf("Process: %d (thread %d). Try get data from %d\n", rank, CALC_THREAD, procRank);
  199.  
  200. int req = 1;
  201. MPI_Send(&req, 1, MPI_INT, procRank, 1, MPI_COMM_WORLD);
  202. MPI_Recv(&req, 1, MPI_INT, procRank, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  203. if (req == 0) {
  204. printf("Process: %d (thread %d). %d doesn't has tasks\n", rank, CALC_THREAD, procRank);
  205. return 0;
  206. }
  207.  
  208. Task task;
  209. MPI_Recv(&task, 1, TASK_INFO_TYPE, procRank, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  210. pthread_mutex_lock(&mutex);
  211. tasks.push_back(task);
  212. pthread_mutex_unlock(&mutex);
  213.  
  214. printf("Process: %d (thread %d). New task loaded from %d\n", rank, CALC_THREAD, procRank);
  215. return 1;
  216. }
  217.  
  218. int main(int argc, char **argv) {
  219. int provided;
  220. MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
  221.  
  222. MPI_Comm_size(MPI_COMM_WORLD, &size);
  223. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  224.  
  225. if (provided != MPI_THREAD_MULTIPLE) {
  226. printf("Process: %d. Too low MPI thread level provided\n", rank);
  227. MPI_Finalize();
  228. return -1;
  229. }
  230.  
  231. createType();
  232. pthread_mutex_init(&mutex, NULL);
  233.  
  234. double start = MPI_Wtime();
  235.  
  236. if (work() != 0) {
  237. pthread_mutex_destroy(&mutex);
  238. deleteType();
  239. MPI_Finalize();
  240. perror("work()");
  241. return -1;
  242. }
  243.  
  244. double end = MPI_Wtime();
  245. printf("Process: %d. Time: %lf\n", rank, end - start);
  246.  
  247. pthread_mutex_destroy(&mutex);
  248. deleteType();
  249. MPI_Finalize();
  250. return 0;
  251. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement