Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <mpi.h>
- #include <cmath>
- #include <ctime>
- #include <cstdarg>
- #include <unistd.h>
- #include <vector>
- #include <pthread.h>
- #include <stdio.h>
- #define CALC_THREAD 0
- #define DATA_THREAD 1
- #define COUNT_ITER 10
- #define COUNT_TASKS 10000
- #define DEFAULT_WEIGHT 10000
- #define EXTRA_WEIGHT 2000
- struct Task {
- int repeatNum;
- };
- pthread_mutex_t mutex;
- pthread_t threads[2];
- std::vector<Task> tasks;
- MPI_Datatype TASK_INFO_TYPE;
- int rank;
- int size;
- int currentIter;
- int currentTask;
- int doneWeight = 0;
- int getNewTask(int procRank);
- double doTask(Task &task) {
- double res = 0;
- for (int i = 0; i < task.repeatNum; i++) {
- res += sqrt(i*sqrt(i*i*i*sqrt(i)));
- }
- doneWeight += task.repeatNum / 1000;
- return res;
- }
- void generateTasks(std::vector<Task> &tasks, int countTasks, int currentIter) {
- int weight = std::abs(rank - (currentIter % size));
- printf("Process: %d (thread %d). Generate %d tasks with weight %d\n", rank, CALC_THREAD, countTasks, weight);
- pthread_mutex_lock(&mutex);
- tasks.clear();
- for (int i = 0; i < countTasks; i++) {
- Task task;
- task.repeatNum = DEFAULT_WEIGHT + EXTRA_WEIGHT * weight;
- tasks.push_back(task);
- }
- pthread_mutex_unlock(&mutex);
- }
- void reset(int *arr) {
- for (int i = 0; i < size; i++) {
- arr[i] = 1;
- }
- arr[rank] = 0;
- }
- void *calcThread(void *args) {
- int *otherProcesses = new int[size];
- for (currentIter = 0; currentIter < COUNT_ITER; currentIter++) {
- printf("Process: %d (thread %d). Start iteration %d\n", rank, CALC_THREAD, currentIter);
- printf("Process: %d (thread %d). Generate tasks\n", rank, CALC_THREAD);
- int count = COUNT_TASKS / size;
- if (rank >= size - (COUNT_TASKS % size)) {
- count++;
- }
- generateTasks(tasks, count, currentIter);
- reset(otherProcesses);
- currentTask = 0;
- int i = 0;
- int askRank;
- while (true) {
- pthread_mutex_lock(&mutex);
- while (currentTask < tasks.size()) {
- Task taskToDo = tasks[currentTask];
- pthread_mutex_unlock(&mutex);
- double result = doTask(taskToDo);
- printf("Process: %d (thread %d). Finish task %d, result: %lf\n",
- rank,
- CALC_THREAD,
- currentTask,
- result);
- currentTask++;
- pthread_mutex_lock(&mutex);
- }
- pthread_mutex_unlock(&mutex);
- for (; i < size;) {
- askRank = (rank + i) % size;
- if (!otherProcesses[askRank]) {
- i++;
- continue;
- } else {
- otherProcesses[askRank] = getNewTask(askRank);
- break;
- }
- }
- if (i == size) {
- break;
- }
- }
- printf("Process: %d (thread %d). End iteration %d, wait all\n", rank, CALC_THREAD, currentIter);
- MPI_Barrier(MPI_COMM_WORLD);
- }
- printf("Process: %d (thread %d). Send end request\n", rank, CALC_THREAD);
- int req = 0;
- MPI_Send(&req, 1, MPI_INT, rank, 1, MPI_COMM_WORLD);
- printf("Process: %d (thread %d). End work\n", rank, CALC_THREAD);
- delete[] otherProcesses;
- return NULL;
- }
- void *dataThread(void *args) {
- while (currentIter < COUNT_ITER) {
- MPI_Status status;
- int res;
- MPI_Recv(&res, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
- if (res == 0) {
- printf("Process: %d (thread %d). Get end request from %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
- break;
- }
- printf("Process: %d (thread %d). Get request from %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
- pthread_mutex_lock(&mutex);
- if (currentTask >= tasks.size()) {
- pthread_mutex_unlock(&mutex);
- int answer = 0;
- MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, 2, MPI_COMM_WORLD);
- printf("Process: %d (thread %d). I haven't tasks for %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
- } else {
- Task taskToSend = tasks.back();
- tasks.pop_back();
- pthread_mutex_unlock(&mutex);
- int answer = 1;
- MPI_Send(&answer, 1, MPI_INT, status.MPI_SOURCE, 2, MPI_COMM_WORLD);
- MPI_Send(&taskToSend, 1, TASK_INFO_TYPE, status.MPI_SOURCE, 3, MPI_COMM_WORLD);
- printf("Process: %d (thread %d). Send task to %d\n", rank, DATA_THREAD, status.MPI_SOURCE);
- }
- }
- printf("Process: %d (thread %d). %d weight done. End work\n", rank, DATA_THREAD, doneWeight);
- return NULL;
- }
- int work() {
- pthread_attr_t attrs;
- pthread_attr_init(&attrs);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_JOINABLE);
- pthread_create(&threads[CALC_THREAD], &attrs, calcThread, NULL);
- pthread_create(&threads[DATA_THREAD], &attrs, dataThread, NULL);
- pthread_attr_destroy(&attrs);
- for (int i = 0; i < 2; i++) {
- if (pthread_join(threads[i], NULL) != 0) {
- perror("Cannot join a thread");
- return -1;
- }
- }
- return 0;
- }
- void createType() {
- int count = 1;
- int blocklengths[] = {1};
- MPI_Datatype types[] = {MPI_INT};
- MPI_Aint displs = 0;
- MPI_Type_create_struct(count, blocklengths, &displs, types, &TASK_INFO_TYPE);
- MPI_Type_commit(&TASK_INFO_TYPE);
- }
- void deleteType() {
- MPI_Type_free(&TASK_INFO_TYPE);
- }
- int getNewTask(int procRank) {
- printf("Process: %d (thread %d). Try get data from %d\n", rank, CALC_THREAD, procRank);
- int req = 1;
- MPI_Send(&req, 1, MPI_INT, procRank, 1, MPI_COMM_WORLD);
- MPI_Recv(&req, 1, MPI_INT, procRank, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- if (req == 0) {
- printf("Process: %d (thread %d). %d doesn't has tasks\n", rank, CALC_THREAD, procRank);
- return 0;
- }
- Task task;
- MPI_Recv(&task, 1, TASK_INFO_TYPE, procRank, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- pthread_mutex_lock(&mutex);
- tasks.push_back(task);
- pthread_mutex_unlock(&mutex);
- printf("Process: %d (thread %d). New task loaded from %d\n", rank, CALC_THREAD, procRank);
- return 1;
- }
- int main(int argc, char **argv) {
- int provided;
- MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
- MPI_Comm_size(MPI_COMM_WORLD, &size);
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- if (provided != MPI_THREAD_MULTIPLE) {
- printf("Process: %d. Too low MPI thread level provided\n", rank);
- MPI_Finalize();
- return -1;
- }
- createType();
- pthread_mutex_init(&mutex, NULL);
- double start = MPI_Wtime();
- if (work() != 0) {
- pthread_mutex_destroy(&mutex);
- deleteType();
- MPI_Finalize();
- perror("work()");
- return -1;
- }
- double end = MPI_Wtime();
- printf("Process: %d. Time: %lf\n", rank, end - start);
- pthread_mutex_destroy(&mutex);
- deleteType();
- MPI_Finalize();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement