SHARE
TWEET

Untitled

a guest May 19th, 2017 40 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. //
  2. //  main.c
  3. //  distr
  4. //
  5. //  Created by Алексей Пулич on 18.05.17.
  6. //  Copyright © 2017 Алексей Пулич. All rights reserved.
  7. //
  8.  
  9. #include <stdio.h>
  10. #include <string.h>
  11. #include <stdlib.h>
  12. #include <unistd.h>
  13. #include <time.h>
  14. #include <sys/types.h>
  15. #include <sys/wait.h>
  16. //#include <sys/stat.h>
  17. //#include <fcntl.h>
  18.  
  19. #include "common.h"
  20. #include "ipc.h"
  21. #include "pa1.h"
  22.  
  23. typedef struct {
  24.     int read;
  25.     int write;
  26. } pipe_ends;
  27.  
  28. typedef struct {
  29.     int id;
  30.     pid_t pid;
  31.     pipe_ends *pipes;
  32. } proc;
  33.  
  34. proc *procs;
  35. int procId;
  36. int proc_cnt;
  37.  
  38. pid_t main_proc_pid;
  39.  
  40. FILE *p_events_log;
  41. FILE *p_pipes_log;
  42.  
  43. void proc_work(int id);
  44.  
  45. void prepare_logs();
  46.  
  47. void start_work(int id);
  48.  
  49. void done_work(int id);
  50.  
  51. void wait_all_procs_status(int id, int status);
  52.  
  53. void exit_work();
  54.  
  55. void close_pipes(int id);
  56.  
  57. //////////////////
  58. //send - receive//
  59. //////////////////
  60.  
  61. int receive(void *self, local_id from, Message *msg) {
  62.     proc *proc_tmp = (proc *) self;
  63.     if (read(procs[from].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
  64.         printf("failed to receive data");
  65.         return 2;
  66.     }
  67.     if (read(procs[from].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
  68.         printf("failed to receive data");
  69.         return 2;
  70.     }
  71.     return 0;
  72. }
  73.  
  74. int receive_any(void *self, Message *msg) {
  75.     proc *proc_tmp = (proc *) self;
  76.     for (int i = 0; i < proc_cnt; i++) {
  77.         if (proc_tmp->id == i) continue;
  78.  
  79.         if (read(procs[i].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
  80.             printf("failed to receive data");
  81.             return 2;
  82.         }
  83.         if (read(procs[i].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
  84.             printf("failed to receive data");
  85.             return 2;
  86.         }
  87.         break;
  88.     }
  89.     return 0;
  90. }
  91.  
  92.  
  93. int send(void *self, local_id dst, const Message *msg) {
  94.     proc *proc_tmp = (proc *) self;
  95.     if (write(proc_tmp->pipes[dst].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
  96.         msg->s_header.s_payload_len + sizeof(MessageHeader)) {
  97.         printf("failed to send data");
  98.         return 3;
  99.     }
  100.  
  101.     return 0;
  102. }
  103.  
  104. int send_multicast(void *self, const Message *msg) {
  105.     proc *proc_tmp = (proc *) self;
  106.  
  107.     for (int i = 0; i < proc_cnt; i++) {
  108.  
  109.         if (i == proc_tmp->id)
  110.             continue;
  111.  
  112.         if (write(proc_tmp->pipes[i].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
  113.             msg->s_header.s_payload_len + sizeof(MessageHeader)) {
  114.             printf("failed to send data");
  115.             return 3;
  116.         }
  117.  
  118. //        printf("SEND MULTICAST MESSAGE <%s> (FROM %d TO %d)\n", msg->s_payload, proc_tmp->id, i);
  119.     }
  120.  
  121.     return 0;
  122. }
  123.  
  124. //////////////////////
  125. //END send - receive//
  126. //////////////////////
  127.  
  128. void prepare_logs() {
  129.     p_events_log = fopen(events_log, "w+");
  130.     p_pipes_log = fopen(pipes_log, "w+");
  131. }
  132.  
  133. void wait_all_procs_status(int id, int status) {
  134.     Message msg;
  135.     for (int i = 1; i < proc_cnt; i++) {
  136.         if (i == id) continue;
  137.         do {
  138.             receive(&procs[id], i, &msg);
  139.         } while (msg.s_header.s_type != status);
  140. //                printf("RECEIVE MSG <%s> (FROM %d to %d)\n", msg.s_payload, i, id);
  141.     }
  142. }
  143.  
  144. void close_pipes(int id) {
  145.     for (int i = 1; i < proc_cnt; i++) {
  146.         for (int j = 0; j < proc_cnt; j++) {
  147.             int tmp_read = procs[i].pipes[j].read;
  148.             int tmp_write = procs[i].pipes[j].write;
  149.             if(i==j)
  150.                 continue;
  151.             if (i == id) {
  152.                 close(procs[i].pipes[j].read);
  153.                 fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  154.             }
  155.             else if (j == id) {
  156.                 close(procs[i].pipes[j].write);
  157.                 fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  158.             }
  159.             else {
  160.                 if(j==procs[i].id) continue;
  161.                 close(procs[i].pipes[j].read);
  162.                 close(procs[i].pipes[j].write);
  163.                 fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  164.                 fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  165.             }
  166.         }
  167.     }
  168. }
  169.  
  170. //child-process life-cycle
  171. void proc_work(int id) {
  172.     close_pipes(id);
  173.  
  174.     start_work(id);
  175.  
  176.     //Useful work
  177.  
  178.     done_work(id);
  179. }
  180.  
  181. void start_work(int id) {
  182.     fprintf(p_events_log, log_started_fmt, id, getpid(), getppid());
  183.     fflush(p_events_log);
  184.     printf(log_started_fmt, id, getpid(), getppid());
  185.  
  186.  
  187.     Message msg;
  188.     sprintf(msg.s_payload, log_started_fmt, id, getpid(), getppid());
  189.     msg.s_header.s_magic = MESSAGE_MAGIC;
  190.     msg.s_header.s_local_time = time(NULL);
  191.     msg.s_header.s_type = STARTED;
  192.     msg.s_header.s_payload_len = strlen(msg.s_payload);
  193.  
  194.     send_multicast(&procs[id], &msg);
  195.     wait_all_procs_status(id, STARTED);
  196.     fprintf(p_events_log, log_received_all_started_fmt, id);
  197.     fflush(p_events_log);
  198.     printf(log_received_all_started_fmt, id);
  199. }
  200.  
  201. void done_work(int id) {
  202.     fprintf(p_events_log, log_done_fmt, id);
  203.     fflush(p_events_log);
  204.     printf(log_done_fmt, id);
  205.  
  206.     Message msg;
  207.     sprintf(msg.s_payload, log_done_fmt, id);
  208.     msg.s_header.s_magic = MESSAGE_MAGIC;
  209.     msg.s_header.s_local_time = time(NULL);
  210.     msg.s_header.s_type = DONE;
  211.     msg.s_header.s_payload_len = strlen(msg.s_payload);
  212.  
  213.     send_multicast(&procs[id], &msg);
  214.     wait_all_procs_status(id, DONE);
  215.     fprintf(p_events_log, log_received_all_done_fmt, id);
  216.     fflush(p_events_log);
  217.     printf(log_received_all_done_fmt, id);
  218. }
  219.  
  220. void exit_work() {
  221.     for (int i = 0; i < proc_cnt; i++) {
  222.         for (int j = 0; j < proc_cnt; j++) {
  223.             if (i == j) continue;
  224.             int tmp_read = procs[i].pipes[j].read;
  225.             int tmp_write = procs[i].pipes[j].write;
  226.             close(procs[i].pipes[j].read);
  227.             fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  228.             fflush(p_pipes_log);
  229.             close(procs[i].pipes[j].write);
  230.             fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  231.             fflush(p_pipes_log);
  232.         }
  233.     }
  234.     free(procs);
  235.     fclose(p_events_log);
  236.     fclose(p_pipes_log);
  237. }
  238.  
  239. int main(int argc, const char *argv[]) {
  240.  
  241.     if (argc < 3 || strcmp(argv[1], "-p") != 0) {
  242.         printf("use key -p <N> to set the number of child processes");
  243.         return 1;
  244.     }
  245.  
  246.     main_proc_pid = getpid();
  247.  
  248.     proc_cnt = atoi(argv[2]) + 1;
  249.  
  250.     procs = (proc *) malloc(sizeof(proc) * proc_cnt);
  251.  
  252.     prepare_logs();
  253.  
  254.     for (int i = 0; i < proc_cnt; i++) {
  255.         proc tmp_proc;
  256.         tmp_proc.pipes = (pipe_ends *) malloc(sizeof(pipe_ends) * proc_cnt);
  257.  
  258.         for (int j = 0; j < proc_cnt; j++) {
  259.             if (i == j) continue;
  260.             int tmp_pipe[2];
  261.             pipe(tmp_pipe);
  262.             tmp_proc.pipes[j].read = tmp_pipe[0];
  263.             tmp_proc.pipes[j].write = tmp_pipe[1];
  264.             fprintf(p_pipes_log, "CREATED pipe FOR procs %d -> %d (READ: %d WRITE: %d)\n", i, j, tmp_proc.pipes[j].read,
  265.                     tmp_proc.pipes[j].write);
  266.             fflush(p_pipes_log);
  267.         }
  268.         procs[i] = tmp_proc;
  269.     }
  270.  
  271.     procs[0].pid = main_proc_pid;
  272.     procs[0].id = PARENT_ID;
  273.     pid_t pid = getppid();
  274.     for (int i = 1; i < proc_cnt; i++) {
  275.         if ((pid = fork()) == 0) {
  276.             procs[i].pid = getpid();
  277.             procs[i].id = i;
  278.             proc_work(i);
  279.             break;
  280.         }
  281.     }
  282.     if (pid != 0) {
  283.         wait_all_procs_status(PARENT_ID, STARTED);
  284.         fprintf(p_events_log, log_received_all_started_fmt, PARENT_ID);
  285.         fflush(p_events_log);
  286.         printf(log_received_all_started_fmt, PARENT_ID);
  287.  
  288.         wait_all_procs_status(PARENT_ID, DONE);
  289.         fprintf(p_events_log, log_received_all_done_fmt, PARENT_ID);
  290.         fflush(p_events_log);
  291.         printf(log_received_all_done_fmt, PARENT_ID);
  292.  
  293.         while (--proc_cnt)
  294.             wait(NULL);
  295.         //        printf("All childs terminated!\n");
  296.         exit_work();
  297.     }
  298.  
  299.     return 0;
  300. }
RAW Paste Data
Challenge yourself this year...
Learn something new in 2017
Top