Advertisement
Guest User

Untitled

a guest
May 25th, 2019
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 7.92 KB | None | 0 0
  1. #include <cstdio>
  2. #include <ctime>
  3. #include <pthread.h>
  4. //#include <mpi.h>
  5. #include "/usr/include/mpi/mpi.h"
  6. #include <cstddef>
  7. #include <cstdlib>
  8. #include <vector>
  9. #include <cstdarg>
  10. #include <cmath>
  11. #include <queue>
  12.  
  13. #define COUNT_TASKS 10
  14. #define WEIGHT_MULTIPLIER 1000000ll
  15.  
  16. #define REQUEST_TAG 0
  17. #define ANSWER_TAG 1
  18. #define DATA_TAG 2
  19.  
  20. // много печати закомментированно чтобы иметь более чистый и понятный лог рабочей программы
  21. // для дебага можно раскомментить
  22.  
  23. typedef struct task_t {
  24.     long long int weight;
  25. } task_t;
  26.  
  27. int size, rank;
  28. MPI_Datatype TASK_TYPE;
  29. pthread_mutex_t task_mutex;
  30. pthread_t data_thread;
  31. std::queue<task_t> tasks;
  32. struct timespec start;
  33.  
  34. int done_task_count = 0;
  35. long long int done_weight_count = 0;
  36. int queue_task_count = 0;
  37. long long int queue_weight_count = 0;
  38.  
  39. long double do_task(task_t task) {
  40.     long double res = 0;
  41.     for (int i = 0; i < task.weight; i++) {
  42.         res += sqrt(i);
  43.         res += pow(i, i);
  44.     }
  45.     return res;
  46. }
  47.  
  48. void dev_print(const char *thread_name, const char *str, ...) {
  49.     va_list args;
  50.     va_start (args, str);
  51.  
  52.     struct timespec end;
  53.     clock_gettime(CLOCK_MONOTONIC_RAW, &end);
  54.  
  55.     /*printf("%lf   Process %d[%s]: ", end.tv_sec - start.tv_sec + 0.000000001 * (end.tv_nsec - start.tv_nsec),
  56.            rank, thread_name);*/
  57.     printf("%lf   %d: ", end.tv_sec + 0.000000001 * end.tv_nsec,
  58.            rank);
  59.     vprintf(str, args);
  60.     printf("\n");
  61.     fflush(stdout);
  62.  
  63.     va_end (args);
  64. }
  65.  
  66. void *calc_thread_func(void *args) {
  67.     MPI_Barrier(MPI_COMM_WORLD);
  68.  
  69.     //dev_print("calc", "started");
  70.  
  71.     // 1 - может отправить задачи, 0 - нет
  72.     int *proc_state = new int[size];
  73.     for (int i = 0; i < size; i++)
  74.         proc_state[i] = 1;
  75.     proc_state[rank] = 0;
  76.  
  77.     int cur_supplier = 0;
  78.  
  79.     while (cur_supplier < size) {
  80.         pthread_mutex_lock(&task_mutex);
  81.         while (!tasks.empty()) {
  82.             task_t task = tasks.front();
  83.             tasks.pop();
  84.  
  85.             pthread_mutex_unlock(&task_mutex);
  86.  
  87.             long double res = do_task(task);
  88.  
  89.             pthread_mutex_lock(&task_mutex);
  90.             queue_task_count--;
  91.             queue_weight_count -= task.weight;
  92.             done_task_count++;
  93.             done_weight_count += task.weight;
  94.  
  95.             /*dev_print("calc", "finished task, done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count,
  96.                     queue_task_count, queue_weight_count);*/
  97.             dev_print("calc", "done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count / WEIGHT_MULTIPLIER,
  98.                       queue_task_count, queue_weight_count / WEIGHT_MULTIPLIER);
  99.  
  100.             //pthread_mutex_lock(&task_mutex);
  101.         }
  102.         pthread_mutex_unlock(&task_mutex);
  103.  
  104.         int supplier_rank = (rank + cur_supplier) % size;
  105.         if (!proc_state[supplier_rank]) {
  106.             cur_supplier++;
  107.             continue;
  108.         }
  109.  
  110.         //пробуем получить ещё задание
  111.  
  112.         //dev_print("calc", "trying to get tasks from process %d", supplier_rank);
  113.  
  114.         int message = 1;
  115.         MPI_Send(&message, 1, MPI_INT, supplier_rank, REQUEST_TAG, MPI_COMM_WORLD);
  116.         MPI_Recv(&message, 1, MPI_INT, supplier_rank, ANSWER_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  117.  
  118.         if (message == 0) {
  119.             //dev_print("calc", "no tasks available in process %d", supplier_rank);
  120.             proc_state[supplier_rank] = 0;
  121.             cur_supplier++;
  122.         } else {
  123.             //dev_print("calc", "receiving new task from process %d...", supplier_rank);
  124.             task_t task;
  125.             MPI_Recv(&task, 1, TASK_TYPE, supplier_rank, DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  126.             //dev_print("calc", "received new task from process %d", supplier_rank);
  127.  
  128.             pthread_mutex_lock(&task_mutex);
  129.             tasks.push(task);
  130.             queue_weight_count += task.weight;
  131.             queue_task_count++;
  132.             pthread_mutex_unlock(&task_mutex);
  133.  
  134.             //dev_print("calc", "put to list new task from process %d", supplier_rank);
  135.             //proc_state[supplier_rank] = 1;
  136.         }
  137.     }
  138.  
  139.     //ждём все процессы
  140.     //dev_print("calc", "finished iteration, waiting for others");
  141.     MPI_Barrier(MPI_COMM_WORLD);
  142.  
  143.     //отправляем в соседний поток чтобы он завершился
  144.     int message = 0;
  145.     MPI_Send(&message, 1, MPI_INT, rank, REQUEST_TAG, MPI_COMM_WORLD);
  146.  
  147.     delete[] proc_state;
  148.     dev_print("calc", "ended, done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count / WEIGHT_MULTIPLIER,
  149.               queue_task_count, queue_weight_count / WEIGHT_MULTIPLIER);
  150.  
  151.     return 0;
  152. }
  153.  
  154. void *data_thread_function(void *args) {
  155.     int message1, answer;
  156.     MPI_Status status;
  157.     while (true) {
  158.         MPI_Recv(&message1, 1, MPI_INT, MPI_ANY_SOURCE, REQUEST_TAG, MPI_COMM_WORLD, &status);
  159.  
  160.         //dev_print("data", "received message %d from process %d", message, status.MPI_SOURCE);
  161.  
  162.         if (message1 == 0)
  163.             break;
  164.  
  165.         pthread_mutex_lock(&task_mutex);
  166.  
  167.         if (tasks.empty()) {
  168.             answer = 0;
  169.             MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, ANSWER_TAG, MPI_COMM_WORLD);
  170.  
  171.             pthread_mutex_unlock(&task_mutex);
  172.  
  173.             //dev_print("data", "don't have tasks for process %d", status.MPI_SOURCE);
  174.             continue;
  175.         }
  176.  
  177.         answer = 1;
  178.         MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, ANSWER_TAG, MPI_COMM_WORLD);
  179.         MPI_Send(&tasks.front(), 1, TASK_TYPE, status.MPI_SOURCE, DATA_TAG, MPI_COMM_WORLD);
  180.  
  181.         queue_task_count--;
  182.         queue_weight_count -= tasks.front().weight;
  183.  
  184.         tasks.pop();
  185.  
  186.         //dev_print("data", "sent task to process %d", status.MPI_SOURCE);
  187.  
  188.         pthread_mutex_unlock(&task_mutex);
  189.     }
  190.     dev_print("data", "ended");
  191.     return 0;
  192. }
  193.  
  194. int main(int argc, char **argv) {
  195.     clock_gettime(CLOCK_MONOTONIC_RAW, &start);
  196.  
  197.     int provided;
  198.     MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
  199.  
  200.     if (provided != MPI_THREAD_MULTIPLE) {
  201.         fprintf(stdout, "Can't get MPI_THREAD_MULTIPLE (%d) level, got %d\n", MPI_THREAD_MULTIPLE, provided);
  202.         MPI_Finalize();
  203.         return 1;
  204.     }
  205.     MPI_Comm_size(MPI_COMM_WORLD, &size);
  206.     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  207.  
  208.     //создаём MPI тип
  209.  
  210.     int fields_number = 1;
  211.     int block_lengths[] = {1};
  212.     MPI_Datatype types[] = {MPI_LONG_LONG}, temp_type;
  213.     MPI_Aint offsets[] = {offsetof(task_t, weight)}, lb, extent;
  214.  
  215.     MPI_Type_create_struct(fields_number, block_lengths, offsets, types, &temp_type);
  216.     MPI_Type_get_extent(temp_type, &lb, &extent);
  217.     MPI_Type_create_resized(temp_type, lb, extent, &TASK_TYPE);
  218.     MPI_Type_commit(&TASK_TYPE);
  219.  
  220.     //генерируем задания
  221.  
  222.     //dev_print("calc", "started task generation");
  223.     int weight = (rank + 1) * WEIGHT_MULTIPLIER;
  224.     for (int i = 0; i < COUNT_TASKS; i++) {
  225.         task_t task;
  226.         task.weight = weight;
  227.         tasks.push(task);
  228.         queue_weight_count += weight;
  229.         queue_task_count++;
  230.     }
  231.     //dev_print("calc", "finished task generation");
  232.  
  233.     //создаём мьютекс и второй тред
  234.     pthread_mutex_init(&task_mutex, NULL);
  235.  
  236.     pthread_attr_t attrs;
  237.     pthread_attr_init(&attrs);
  238.     pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE);
  239.     pthread_create(&data_thread, &attrs, data_thread_function, NULL);   //создаём ожидающий запросы поток
  240.  
  241.     calc_thread_func(NULL);     //основной поток будет считать
  242.  
  243.  
  244.     if (pthread_join(data_thread, NULL) != EXIT_SUCCESS) {
  245.         perror("Cannot join a thread");
  246.     }
  247.  
  248.     MPI_Finalize();
  249.     return 0;
  250. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement