Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdlib.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <getopt.h>
- #include <string.h>
- #include <time.h>
- #include <errno.h>
- #include "ipc.h"
- #include "pa1.h"
- #include "common.h"
- #define MAX_NUM_OF_PROCESSES 10
- static const char * const log_opened_fmt =
- "Pipe (input %5d, output %5d) was opened\n";
- static const char * const log_closed_fmt =
- "Pipe %5d in process %5d was closed\n";
- static const char * const log_read_fmt =
- "Pipe (input %5d, output %5d) in process %5d read %5d bytes\n";
- static const char * const log_write_fmt =
- "Pipe (input %5d, output %5d) in process %5d write %5d bytes\n";
- typedef struct{
- int in;
- int out;
- } Pipe;
- typedef struct{
- int numOfChildren;
- local_id id;
- Pipe *pipes[MAX_NUM_OF_PROCESSES + 1][MAX_NUM_OF_PROCESSES + 1];
- } IO;
- int send(void * self, local_id dst, const Message * msg){
- IO *inout = (IO*)self;
- int fd = inout -> pipes[inout -> id][dst] -> out;
- if (write(fd, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) == -1)
- return errno;
- else return 0;
- }
- int send_multicast(void * self, const Message * msg){
- IO *inout = (IO*)self;
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id){
- int fd = inout -> pipes[inout -> id][i] -> out;
- int sum;
- switch(sum = write(fd, msg, msg -> s_header.s_payload_len + sizeof(MessageHeader))){
- case -1: return errno;
- default:
- printf(log_write_fmt, inout -> pipes[inout -> id][i] -> in, inout -> pipes[inout -> id][i] -> out, inout -> id, sum);
- break;
- }
- }
- }
- return 0;
- }
- int receive(void * self, local_id from, Message * msg){
- IO *inout = (IO*)self;
- int fd = inout -> pipes[from][inout -> id] -> in;
- int sum, sum1;
- if ((sum = read(fd, &msg -> s_header, sizeof(MessageHeader))) == -1) return errno;
- printf(log_read_fmt, inout -> pipes[from][inout -> id] -> in, inout -> pipes[from][inout -> id] -> out, inout -> id, sum);
- if (msg -> s_header.s_payload_len > 0) {
- sum1 = read(fd, msg->s_payload, msg->s_header.s_payload_len);
- printf(log_read_fmt, inout -> pipes[from][inout -> id] -> in, inout -> pipes[from][inout -> id] -> out, inout -> id, sum1);
- }
- return 0;
- }
- //wrong, must be released with fcntl(2)
- int receive_any(void * self, Message * msg){
- IO *inout = (IO*)self;
- while(1){
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id - 1){
- int fd = inout -> pipes[i][inout -> id] -> in;
- int cur = lseek(fd, 0, SEEK_CUR);
- int end = lseek(fd, 0, SEEK_END);
- lseek(fd, cur, SEEK_SET);
- if (end > cur) {
- if (read(fd, msg, msg->s_header.s_payload_len) == -1)
- return errno;
- else return 0;
- }
- }
- }
- }
- }
- int main(int argc, char *argv[]){
- // create an IO structure
- IO *inout = (IO*)malloc(sizeof(IO));
- // number of child processes from command line to numOfChildren variable
- int opt;
- while ( (opt = getopt(argc,argv,"p:")) != -1){
- switch (opt){
- case 'p': inout -> numOfChildren = atoi(optarg); break;
- default:
- free(inout);
- return -1;
- }
- }
- inout -> id = PARENT_ID;
- // create or open logs for writing
- FILE *pipesLog = fopen(pipes_log, "w+");
- FILE *eventsLog = fopen(events_log, "w+");
- // create pipes and fill the inout.pipes
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j){
- Pipe *pipeMem = (Pipe*)malloc(sizeof(Pipe));
- int pipefd[2];
- pipe(pipefd);
- pipeMem -> in = pipefd[0];
- pipeMem -> out = pipefd[1];
- inout -> pipes[i][j] = pipeMem;
- fprintf(pipesLog, log_opened_fmt, pipefd[0], pipefd[1]);
- fflush(pipesLog);
- printf(log_opened_fmt, pipefd[0], pipefd[1]);
- }
- }
- }
- Message *msgReceive = (Message*)malloc(sizeof(Message));
- // create children
- for (int id = 1; id < inout -> numOfChildren + 1; id++){
- switch(fork()){
- case -1:
- perror("fork");
- exit(1);
- case 0:
- fprintf(eventsLog, log_started_fmt, id, getpid(), getppid());
- fflush(eventsLog);
- printf(log_started_fmt, id, getpid(), getppid());
- inout -> id = id;
- // close unused pipes
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j){
- if (i != inout -> id && j != inout -> id){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- if (i == inout -> id){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- }
- if (j == inout -> id){
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- }
- }
- }
- // fill the STARTED message
- Message *msg = (Message*)malloc(sizeof(Message));
- sprintf(msg -> s_payload, log_started_fmt, id, getpid(), getppid());
- msg -> s_header.s_magic = MESSAGE_MAGIC;
- msg -> s_header.s_payload_len = strlen(msg -> s_payload)+1;
- msg -> s_header.s_type = STARTED;
- msg -> s_header.s_local_time = time(NULL);
- // send STARTED message to other processes
- send_multicast(inout, msg);
- // receive STARTED message from other processes
- for (int i = 1; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id){
- do{
- receive(inout, i, msgReceive);
- } while (msgReceive -> s_header.s_type != STARTED);
- }
- }
- // logs
- fprintf(eventsLog, log_received_all_started_fmt, inout -> id);
- fflush(eventsLog);
- printf(log_received_all_started_fmt, inout -> id);
- // fill the DONE message
- sprintf(msg -> s_payload, log_done_fmt, inout -> id);
- msg -> s_header.s_magic = MESSAGE_MAGIC;
- msg -> s_header.s_payload_len = strlen(msg -> s_payload)+1;
- msg -> s_header.s_type = DONE;
- msg -> s_header.s_local_time = time(NULL);
- //msg -> s_header = *head;
- // send STARTED message to other processes
- send_multicast(inout, msg);
- // receive STARTED message from other processes
- for (int i = 1; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id){
- do{
- receive(inout, i, msgReceive);
- } while (msgReceive -> s_header.s_type != DONE);
- }
- }
- // logs
- fprintf(eventsLog, log_received_all_done_fmt, inout -> id);
- fflush(eventsLog);
- printf(log_received_all_done_fmt, inout -> id);
- // close all other pipes
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j){
- if (i == inout -> id){
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- if (j == inout -> id){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- }
- }
- }
- }
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j) free(inout -> pipes[i][j]);
- }
- }
- free(msg);
- fprintf(eventsLog, log_done_fmt, inout -> id);
- fflush(eventsLog);
- printf(log_done_fmt, inout -> id);
- return 0;
- default:
- break;
- }
- }
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j){
- if (i != PARENT_ID && j != PARENT_ID){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- if (i == PARENT_ID){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- }
- if (j == PARENT_ID){
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- }
- }
- }
- for (int i = 1; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id){
- do{
- receive(inout, i, msgReceive);
- } while (msgReceive -> s_header.s_type != STARTED);
- }
- }
- fprintf(eventsLog, log_received_all_started_fmt, inout -> id);
- fflush(eventsLog);
- printf(log_received_all_started_fmt, inout -> id);
- for (int i = 1; i < inout -> numOfChildren + 1; i++){
- if (i != inout -> id){
- do{
- receive(inout, i, msgReceive);
- } while (msgReceive -> s_header.s_type != DONE);
- }
- }
- fprintf(eventsLog, log_received_all_done_fmt, inout -> id);
- fflush(eventsLog);
- printf(log_received_all_done_fmt, inout -> id);
- for (int i = 0; i < inout -> numOfChildren + 1; i++){
- for (int j = 0; j < inout -> numOfChildren + 1; j++){
- if (i != j){
- if (i == PARENT_ID){
- close(inout -> pipes[i][j] -> out);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> out, inout -> id);
- }
- if (j == PARENT_ID){
- close(inout -> pipes[i][j] -> in);
- fprintf(pipesLog, log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- fflush(pipesLog);
- printf(log_closed_fmt, inout -> pipes[i][j] -> in, inout -> id);
- }
- }
- }
- }
- for (int i = 0; i < inout -> numOfChildren; i++)
- wait(NULL);
- free(inout);
- free(msgReceive);
- fclose(pipesLog);
- fclose(eventsLog);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement