Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <cstdio>
- #include <ctime>
- #include <pthread.h>
- //#include <mpi.h>
- #include "/usr/include/mpi/mpi.h"
- #include <cstddef>
- #include <cstdlib>
- #include <vector>
- #include <cstdarg>
- #include <cmath>
- #include <queue>
- #define COUNT_TASKS 10
- #define WEIGHT_MULTIPLIER 1000000ll
- #define REQUEST_TAG 0
- #define ANSWER_TAG 1
- #define DATA_TAG 2
- // много печати закомментированно чтобы иметь более чистый и понятный лог рабочей программы
- // для дебага можно раскомментить
- typedef struct task_t {
- long long int weight;
- } task_t;
- int size, rank;
- MPI_Datatype TASK_TYPE;
- pthread_mutex_t task_mutex;
- pthread_t data_thread;
- std::queue<task_t> tasks;
- struct timespec start;
- int done_task_count = 0;
- long long int done_weight_count = 0;
- int queue_task_count = 0;
- long long int queue_weight_count = 0;
- long double do_task(task_t task) {
- long double res = 0;
- for (int i = 0; i < task.weight; i++) {
- res += sqrt(i);
- res += pow(i, i);
- }
- return res;
- }
- void dev_print(const char *thread_name, const char *str, ...) {
- va_list args;
- va_start (args, str);
- struct timespec end;
- clock_gettime(CLOCK_MONOTONIC_RAW, &end);
- /*printf("%lf Process %d[%s]: ", end.tv_sec - start.tv_sec + 0.000000001 * (end.tv_nsec - start.tv_nsec),
- rank, thread_name);*/
- printf("%lf %d: ", end.tv_sec + 0.000000001 * end.tv_nsec,
- rank);
- vprintf(str, args);
- printf("\n");
- fflush(stdout);
- va_end (args);
- }
- void *calc_thread_func(void *args) {
- MPI_Barrier(MPI_COMM_WORLD);
- //dev_print("calc", "started");
- // 1 - может отправить задачи, 0 - нет
- int *proc_state = new int[size];
- for (int i = 0; i < size; i++)
- proc_state[i] = 1;
- proc_state[rank] = 0;
- int cur_supplier = 0;
- while (cur_supplier < size) {
- pthread_mutex_lock(&task_mutex);
- while (!tasks.empty()) {
- task_t task = tasks.front();
- tasks.pop();
- pthread_mutex_unlock(&task_mutex);
- long double res = do_task(task);
- pthread_mutex_lock(&task_mutex);
- queue_task_count--;
- queue_weight_count -= task.weight;
- done_task_count++;
- done_weight_count += task.weight;
- /*dev_print("calc", "finished task, done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count,
- queue_task_count, queue_weight_count);*/
- dev_print("calc", "done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count / WEIGHT_MULTIPLIER,
- queue_task_count, queue_weight_count / WEIGHT_MULTIPLIER);
- //pthread_mutex_lock(&task_mutex);
- }
- pthread_mutex_unlock(&task_mutex);
- int supplier_rank = (rank + cur_supplier) % size;
- if (!proc_state[supplier_rank]) {
- cur_supplier++;
- continue;
- }
- //пробуем получить ещё задание
- //dev_print("calc", "trying to get tasks from process %d", supplier_rank);
- int message = 1;
- MPI_Send(&message, 1, MPI_INT, supplier_rank, REQUEST_TAG, MPI_COMM_WORLD);
- MPI_Recv(&message, 1, MPI_INT, supplier_rank, ANSWER_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- if (message == 0) {
- //dev_print("calc", "no tasks available in process %d", supplier_rank);
- proc_state[supplier_rank] = 0;
- cur_supplier++;
- } else {
- //dev_print("calc", "receiving new task from process %d...", supplier_rank);
- task_t task;
- MPI_Recv(&task, 1, TASK_TYPE, supplier_rank, DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- //dev_print("calc", "received new task from process %d", supplier_rank);
- pthread_mutex_lock(&task_mutex);
- tasks.push(task);
- queue_weight_count += task.weight;
- queue_task_count++;
- pthread_mutex_unlock(&task_mutex);
- //dev_print("calc", "put to list new task from process %d", supplier_rank);
- //proc_state[supplier_rank] = 1;
- }
- }
- //ждём все процессы
- //dev_print("calc", "finished iteration, waiting for others");
- MPI_Barrier(MPI_COMM_WORLD);
- //отправляем в соседний поток чтобы он завершился
- int message = 0;
- MPI_Send(&message, 1, MPI_INT, rank, REQUEST_TAG, MPI_COMM_WORLD);
- delete[] proc_state;
- dev_print("calc", "ended, done: t %d w %d queue: t %d w %d", done_task_count, done_weight_count / WEIGHT_MULTIPLIER,
- queue_task_count, queue_weight_count / WEIGHT_MULTIPLIER);
- return 0;
- }
- void *data_thread_function(void *args) {
- int message1, answer;
- MPI_Status status;
- while (true) {
- MPI_Recv(&message1, 1, MPI_INT, MPI_ANY_SOURCE, REQUEST_TAG, MPI_COMM_WORLD, &status);
- //dev_print("data", "received message %d from process %d", message, status.MPI_SOURCE);
- if (message1 == 0)
- break;
- pthread_mutex_lock(&task_mutex);
- if (tasks.empty()) {
- answer = 0;
- MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, ANSWER_TAG, MPI_COMM_WORLD);
- pthread_mutex_unlock(&task_mutex);
- //dev_print("data", "don't have tasks for process %d", status.MPI_SOURCE);
- continue;
- }
- answer = 1;
- MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, ANSWER_TAG, MPI_COMM_WORLD);
- MPI_Send(&tasks.front(), 1, TASK_TYPE, status.MPI_SOURCE, DATA_TAG, MPI_COMM_WORLD);
- queue_task_count--;
- queue_weight_count -= tasks.front().weight;
- tasks.pop();
- //dev_print("data", "sent task to process %d", status.MPI_SOURCE);
- pthread_mutex_unlock(&task_mutex);
- }
- dev_print("data", "ended");
- return 0;
- }
- int main(int argc, char **argv) {
- clock_gettime(CLOCK_MONOTONIC_RAW, &start);
- int provided;
- MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
- if (provided != MPI_THREAD_MULTIPLE) {
- fprintf(stdout, "Can't get MPI_THREAD_MULTIPLE (%d) level, got %d\n", MPI_THREAD_MULTIPLE, provided);
- MPI_Finalize();
- return 1;
- }
- MPI_Comm_size(MPI_COMM_WORLD, &size);
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- //создаём MPI тип
- int fields_number = 1;
- int block_lengths[] = {1};
- MPI_Datatype types[] = {MPI_LONG_LONG}, temp_type;
- MPI_Aint offsets[] = {offsetof(task_t, weight)}, lb, extent;
- MPI_Type_create_struct(fields_number, block_lengths, offsets, types, &temp_type);
- MPI_Type_get_extent(temp_type, &lb, &extent);
- MPI_Type_create_resized(temp_type, lb, extent, &TASK_TYPE);
- MPI_Type_commit(&TASK_TYPE);
- //генерируем задания
- //dev_print("calc", "started task generation");
- int weight = (rank + 1) * WEIGHT_MULTIPLIER;
- for (int i = 0; i < COUNT_TASKS; i++) {
- task_t task;
- task.weight = weight;
- tasks.push(task);
- queue_weight_count += weight;
- queue_task_count++;
- }
- //dev_print("calc", "finished task generation");
- //создаём мьютекс и второй тред
- pthread_mutex_init(&task_mutex, NULL);
- pthread_attr_t attrs;
- pthread_attr_init(&attrs);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE);
- pthread_create(&data_thread, &attrs, data_thread_function, NULL); //создаём ожидающий запросы поток
- calc_thread_func(NULL); //основной поток будет считать
- if (pthread_join(data_thread, NULL) != EXIT_SUCCESS) {
- perror("Cannot join a thread");
- }
- MPI_Finalize();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement