#include #include #include #include #include #include #include #include #include #include "ipc.h" #include "pa1.h" #include "common.h" #define MAX_NUM_OF_PROCESSES 10 static const char * const log_opened_fmt = "Pipe (input %5d, output %5d) was opened\n"; static const char * const log_closed_fmt = "Pipe %5d in process %5d was closed\n"; static const char * const log_read_fmt = "Pipe (input %5d, output %5d) in process %5d read %5d bytes\n"; static const char * const log_write_fmt = "Pipe (input %5d, output %5d) in process %5d write %5d bytes\n"; typedef struct{ int in; int out; } Pipe; typedef struct{ int numOfChildren; local_id id; Pipe *pipes[MAX_NUM_OF_PROCESSES + 1][MAX_NUM_OF_PROCESSES + 1]; } IO; int send(void * self, local_id dst, const Message * msg){ IO *inout = (IO*)self; int fd = inout -> pipes[inout -> id][dst] -> out; if (write(fd, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) == -1) return errno; else return 0; } int send_multicast(void * self, const Message * msg){ IO *inout = (IO*)self; for (int i = 0; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id){ int fd = inout -> pipes[inout -> id][i] -> out; int sum; switch(sum = write(fd, msg, msg -> s_header.s_payload_len + sizeof(MessageHeader))){ case -1: return errno; default: printf(log_write_fmt, inout -> pipes[inout -> id][i] -> in, inout -> pipes[inout -> id][i] -> out, inout -> id, sum); break; } } } return 0; } int receive(void * self, local_id from, Message * msg){ IO *inout = (IO*)self; int fd = inout -> pipes[from][inout -> id] -> in; int sum, sum1; if ((sum = read(fd, &msg -> s_header, sizeof(MessageHeader))) == -1) return errno; printf(log_read_fmt, inout -> pipes[from][inout -> id] -> in, inout -> pipes[from][inout -> id] -> out, inout -> id, sum); if (msg -> s_header.s_payload_len > 0) { sum1 = read(fd, msg->s_payload, msg->s_header.s_payload_len); printf(log_read_fmt, inout -> pipes[from][inout -> id] -> in, inout -> pipes[from][inout -> id] -> out, inout -> id, sum1); } return 0; } //wrong, must be released with fcntl(2) int receive_any(void * self, Message * msg){ IO *inout = (IO*)self; while(1){ for (int i = 0; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id - 1){ int fd = inout -> pipes[i][inout -> id] -> in; int cur = lseek(fd, 0, SEEK_CUR); int end = lseek(fd, 0, SEEK_END); lseek(fd, cur, SEEK_SET); if (end > cur) { if (read(fd, msg, msg->s_header.s_payload_len) == -1) return errno; else return 0; } } } } } int main(int argc, char *argv[]){ // create an IO structure IO *inout = (IO*)malloc(sizeof(IO)); // number of child processes from command line to numOfChildren variable int opt; while ( (opt = getopt(argc,argv,"p:")) != -1){ switch (opt){ case 'p': inout -> numOfChildren = atoi(optarg); break; default: free(inout); return -1; } } inout -> id = PARENT_ID; // create or open logs for writing FILE *pipesLog = fopen(pipes_log, "w+"); FILE *eventsLog = fopen(events_log, "w+"); // create pipes and fill the inout.pipes for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j){ Pipe *pipeMem = (Pipe*)malloc(sizeof(Pipe)); int pipefd[2]; pipe(pipefd); pipeMem -> in = pipefd[0]; pipeMem -> out = pipefd[1]; inout -> pipes[i][j] = pipeMem; fprintf(pipesLog, log_opened_fmt, pipefd[0], pipefd[1]); fflush(pipesLog); printf(log_opened_fmt, pipefd[0], pipefd[1]); } } } Message *msgReceive = (Message*)malloc(sizeof(Message)); // create children for (int id = 1; id < inout -> numOfChildren + 1; id++){ switch(fork()){ case -1: perror("fork"); exit(1); case 0: fprintf(eventsLog, log_started_fmt, id, getpid(), getppid()); fflush(eventsLog); printf(log_started_fmt, id, getpid(), getppid()); inout -> id = id; // close unused pipes for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j){ if (i != inout -> id && j != inout -> id){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } if (i == inout -> id){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); } if (j == inout -> id){ close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } } } } // fill the STARTED message Message *msg = (Message*)malloc(sizeof(Message)); sprintf(msg -> s_payload, log_started_fmt, id, getpid(), getppid()); msg -> s_header.s_magic = MESSAGE_MAGIC; msg -> s_header.s_payload_len = strlen(msg -> s_payload)+1; msg -> s_header.s_type = STARTED; msg -> s_header.s_local_time = time(NULL); // send STARTED message to other processes send_multicast(inout, msg); // receive STARTED message from other processes for (int i = 1; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id){ do{ receive(inout, i, msgReceive); } while (msgReceive -> s_header.s_type != STARTED); } } // logs fprintf(eventsLog, log_received_all_started_fmt, inout -> id); fflush(eventsLog); printf(log_received_all_started_fmt, inout -> id); // fill the DONE message sprintf(msg -> s_payload, log_done_fmt, inout -> id); msg -> s_header.s_magic = MESSAGE_MAGIC; msg -> s_header.s_payload_len = strlen(msg -> s_payload)+1; msg -> s_header.s_type = DONE; msg -> s_header.s_local_time = time(NULL); //msg -> s_header = *head; // send STARTED message to other processes send_multicast(inout, msg); // receive STARTED message from other processes for (int i = 1; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id){ do{ receive(inout, i, msgReceive); } while (msgReceive -> s_header.s_type != DONE); } } // logs fprintf(eventsLog, log_received_all_done_fmt, inout -> id); fflush(eventsLog); printf(log_received_all_done_fmt, inout -> id); // close all other pipes for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j){ if (i == inout -> id){ close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } if (j == inout -> id){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); } } } } for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j) free(inout -> pipes[i][j]); } } free(msg); fprintf(eventsLog, log_done_fmt, inout -> id); fflush(eventsLog); printf(log_done_fmt, inout -> id); return 0; default: break; } } for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j){ if (i != PARENT_ID && j != PARENT_ID){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } if (i == PARENT_ID){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); } if (j == PARENT_ID){ close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } } } } for (int i = 1; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id){ do{ receive(inout, i, msgReceive); } while (msgReceive -> s_header.s_type != STARTED); } } fprintf(eventsLog, log_received_all_started_fmt, inout -> id); fflush(eventsLog); printf(log_received_all_started_fmt, inout -> id); for (int i = 1; i < inout -> numOfChildren + 1; i++){ if (i != inout -> id){ do{ receive(inout, i, msgReceive); } while (msgReceive -> s_header.s_type != DONE); } } fprintf(eventsLog, log_received_all_done_fmt, inout -> id); fflush(eventsLog); printf(log_received_all_done_fmt, inout -> id); for (int i = 0; i < inout -> numOfChildren + 1; i++){ for (int j = 0; j < inout -> numOfChildren + 1; j++){ if (i != j){ if (i == PARENT_ID){ close(inout -> pipes[i][j] -> out); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id); } if (j == PARENT_ID){ close(inout -> pipes[i][j] -> in); fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); fflush(pipesLog); printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id); } } } } for (int i = 0; i < inout -> numOfChildren; i++) wait(NULL); free(inout); free(msgReceive); fclose(pipesLog); fclose(eventsLog); return 0; }