Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <errno.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/msg.h>
- #include <string.h>
- #include <unistd.h>
- #include <sys/ipc.h>
- #include <time.h>
- double *matrix;
- int matrixWidth;
- int matrixHeight;
- size_t messageSize;
- double *vector;
- double *results;
- int *parentMQIds;
- void readInput(const char *filename) {
- printf("Reading input starts\n");
- FILE *input = fopen(filename, "r");
- fscanf(input, "%d%d", &matrixWidth, &matrixHeight);
- int matrixSize = matrixWidth * matrixHeight;
- matrix = (double *) calloc(matrixSize, sizeof(double));
- for (int i = 0; i < matrixSize; i += 1)
- fscanf(input, "%lf", (matrix + i));
- vector = (double *) calloc(matrixWidth, sizeof(double));
- for (int i = 0; i < matrixWidth; i += 1)
- fscanf(input, "%lf", (vector + i));
- fclose(input);
- printf("Reading input finishes\n");
- }
- int *distributeRows(int childCount) {
- printf("Distributing rows starts\n");
- int *distribution = (int *) calloc(childCount, sizeof(int));
- int part = matrixHeight / childCount;
- if (part > 0)
- for (int i = 0; i < childCount; i += 1)
- distribution[i] = part;
- int rest = matrixHeight % childCount;
- printf("Rest=%d\n", rest);
- for (int i = rest - 1; i >= 0; i -= 1)
- distribution[i] += 1;
- printf("Distributing rows finishes\n");
- return distribution;
- }
- struct msgbuf {
- long mtype;
- char mtext[10];
- };
- void sendMessage(int parentMQId, double message) {
- struct msgbuf msgbuf;
- char str[10];
- messageSize = sizeof(msgbuf.mtext) + 1;
- sprintf(str, "%lf", message);
- strcpy(msgbuf.mtext, str);
- msgbuf.mtype = 1;
- /* send message to queue */
- if (msgsnd(parentMQId, &msgbuf, messageSize, 0) == -1) {
- if (errno == EAGAIN)
- printf("EAGAIN\n");
- perror("msgsnd");
- exit(1);
- }
- }
- void sendData(int parentMQId, int start, int count) {
- printf("Sending data starts\n");
- sendMessage(parentMQId, matrixWidth);
- //mpStart
- sendMessage(parentMQId, start);
- //rCount
- sendMessage(parentMQId, count);
- /* send vector to queue */
- for (int i = 0; i < matrixWidth; i += 1)
- sendMessage(parentMQId, vector[i]);
- int shift;
- /* send matrix part to queue */
- for (int i = start; i < start + count; i += 1) {
- shift = i * matrixWidth;
- for (int j = 0; j < matrixWidth; j += 1)
- sendMessage(parentMQId, matrix[shift + j]);
- }
- printf("Sending data finishes\n");
- }
- int makeMQ(int proj_id) {
- key_t key = ftok(".", proj_id);
- int parentMQId = msgget(key, 0666 | IPC_CREAT | IPC_EXCL);
- if (parentMQId == -1) {
- if (errno == EEXIST) {
- parentMQId = msgget(key, 0666);
- msgctl(parentMQId, IPC_RMID, NULL);
- parentMQId = msgget(key, 0666 | IPC_CREAT | IPC_EXCL);
- if (parentMQId == -1) {
- perror("msgget");
- exit(0);
- }
- } else {
- printf("It is impossible to create message queue\n");
- perror("msgget");
- exit(0);
- }
- }
- return parentMQId;
- }
- int startCalculations(int childCount) {
- int *distribution = distributeRows(childCount);
- printf("Starting up calculation processes starts\n");
- parentMQIds = calloc(childCount, sizeof(int));
- int curStart = 0;
- int i = 0;
- for (i = 0; i < childCount; i += 1) {
- if (distribution[i] == 0)
- break;
- parentMQIds[i] = makeMQ(i);
- printf("Distribution[%d]=%d\n", i, distribution[i]);
- pid_t pid = fork();
- if (pid == -1) {
- perror("fork");
- exit(1);
- }
- if (pid == 0) {
- char str[10];
- sprintf(str, "%d", parentMQIds[i]);
- int res = execl("./calculation.exe", "calculation.exe", str, NULL);
- if (res == -1) {
- perror("execl");
- exit(1);
- }
- exit(0);
- }
- sendData(parentMQIds[i], curStart, distribution[i]);
- curStart += distribution[i];
- }
- free(distribution);
- printf("Starting up calculation processes finishes\n");
- return i;
- }
- double receiveMessage(int parentMQId) {
- struct msgbuf msgbuf;
- char *ptr;
- /* receive message from queue */
- if (msgrcv(parentMQId, &msgbuf, messageSize, 2, 0) == -1) {
- perror("main msgrcv");
- exit(1);
- }
- return strtod(msgbuf.mtext, &ptr);
- }
- void receiveResults(int partsToReceive) {
- results = (double *) calloc(matrixWidth, sizeof(double));
- printf("Receiving results starts\n");
- for (int j = 0; j < partsToReceive; j += 1) {
- int startIdx = (int) receiveMessage(parentMQIds[j]);
- int elCount = (int) receiveMessage(parentMQIds[j]);
- /* receive results from queue */
- for (int i = startIdx; i < startIdx + elCount; i += 1) {
- results[i] = receiveMessage(parentMQIds[j]);
- }
- printf("proc %d \n", j);
- }
- printf("Receiving results finishes\n");
- }
- void writeResult(const char *filename) {
- printf("Writing results starts\n");
- FILE *output = fopen(filename, "w");
- for (int i = 0; i < matrixHeight; i += 1)
- fprintf(output, "%.3lf\n", results[i]);
- fclose(output);
- printf("Writing results finishes\n");
- }
- void doCleanup(int partsToReceive) {
- free(matrix);
- free(vector);
- free(results);
- for (int i = 0; i < partsToReceive; i += 1)
- msgctl(parentMQIds[i], IPC_RMID, NULL);
- free(parentMQIds);
- }
- int main(int argc, char *argv[]) {
- if (argc != 2) {
- printf("USAGE: main.exe child_count\n");
- exit(1);
- }
- char *ptr;
- int childCount = (int) strtol(argv[1], &ptr, 10);
- if (childCount <= 0) {
- printf("USAGE: child_count should be more than 0\n");
- exit(1);
- }
- readInput("input.txt");
- clock_t start, end;
- start = clock();
- int partsToReceive = startCalculations(childCount);
- receiveResults(partsToReceive);
- end = clock();
- double diff = difftime(end, start);
- if (diff >= CLOCKS_PER_SEC)
- printf("Calculations have approximately taken %.3lf seconds\n", diff/CLOCKS_PER_SEC);
- else
- printf("Calculations have approximately taken %.3lf milliseconds\n", diff/1000);
- writeResult("output.txt");
- doCleanup(partsToReceive);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement