Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //
- // main.c
- // distr
- //
- // Created by Алексей Пулич on 18.05.17.
- // Copyright © 2017 Алексей Пулич. All rights reserved.
- //
- #include <stdio.h>
- #include <string.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <time.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- //#include <sys/stat.h>
- //#include <fcntl.h>
- #include "common.h"
- #include "ipc.h"
- #include "pa1.h"
- typedef struct {
- int read;
- int write;
- } pipe_ends;
- typedef struct {
- int id;
- pid_t pid;
- pipe_ends *pipes;
- } proc;
- proc *procs;
- int procId;
- int proc_cnt;
- pid_t main_proc_pid;
- FILE *p_events_log;
- FILE *p_pipes_log;
- void proc_work(int id);
- void prepare_logs();
- void start_work(int id);
- void done_work(int id);
- void wait_all_procs_status(int id, int status);
- void exit_work();
- void close_pipes(int id);
- //////////////////
- //send - receive//
- //////////////////
- int receive(void *self, local_id from, Message *msg) {
- proc *proc_tmp = (proc *) self;
- if (read(procs[from].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
- printf("failed to receive data");
- return 2;
- }
- if (read(procs[from].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
- printf("failed to receive data");
- return 2;
- }
- return 0;
- }
- int receive_any(void *self, Message *msg) {
- proc *proc_tmp = (proc *) self;
- for (int i = 0; i < proc_cnt; i++) {
- if (proc_tmp->id == i) continue;
- if (read(procs[i].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
- printf("failed to receive data");
- return 2;
- }
- if (read(procs[i].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
- printf("failed to receive data");
- return 2;
- }
- break;
- }
- return 0;
- }
- int send(void *self, local_id dst, const Message *msg) {
- proc *proc_tmp = (proc *) self;
- if (write(proc_tmp->pipes[dst].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
- msg->s_header.s_payload_len + sizeof(MessageHeader)) {
- printf("failed to send data");
- return 3;
- }
- return 0;
- }
- int send_multicast(void *self, const Message *msg) {
- proc *proc_tmp = (proc *) self;
- for (int i = 0; i < proc_cnt; i++) {
- if (i == proc_tmp->id)
- continue;
- if (write(proc_tmp->pipes[i].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
- msg->s_header.s_payload_len + sizeof(MessageHeader)) {
- printf("failed to send data");
- return 3;
- }
- // printf("SEND MULTICAST MESSAGE <%s> (FROM %d TO %d)\n", msg->s_payload, proc_tmp->id, i);
- }
- return 0;
- }
- //////////////////////
- //END send - receive//
- //////////////////////
- void prepare_logs() {
- p_events_log = fopen(events_log, "w+");
- p_pipes_log = fopen(pipes_log, "w+");
- }
- void wait_all_procs_status(int id, int status) {
- Message msg;
- for (int i = 1; i < proc_cnt; i++) {
- if (i == id) continue;
- do {
- receive(&procs[id], i, &msg);
- } while (msg.s_header.s_type != status);
- // printf("RECEIVE MSG <%s> (FROM %d to %d)\n", msg.s_payload, i, id);
- }
- }
- void close_pipes(int id) {
- for (int i = 1; i < proc_cnt; i++) {
- for (int j = 0; j < proc_cnt; j++) {
- int tmp_read = procs[i].pipes[j].read;
- int tmp_write = procs[i].pipes[j].write;
- if(i==j)
- continue;
- if (i == id) {
- close(procs[i].pipes[j].read);
- fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
- }
- else if (j == id) {
- close(procs[i].pipes[j].write);
- fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
- }
- else {
- if(j==procs[i].id) continue;
- close(procs[i].pipes[j].read);
- close(procs[i].pipes[j].write);
- fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
- fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
- }
- }
- }
- }
- //child-process life-cycle
- void proc_work(int id) {
- close_pipes(id);
- start_work(id);
- //Useful work
- done_work(id);
- }
- void start_work(int id) {
- fprintf(p_events_log, log_started_fmt, id, getpid(), getppid());
- fflush(p_events_log);
- printf(log_started_fmt, id, getpid(), getppid());
- Message msg;
- sprintf(msg.s_payload, log_started_fmt, id, getpid(), getppid());
- msg.s_header.s_magic = MESSAGE_MAGIC;
- msg.s_header.s_local_time = time(NULL);
- msg.s_header.s_type = STARTED;
- msg.s_header.s_payload_len = strlen(msg.s_payload);
- send_multicast(&procs[id], &msg);
- wait_all_procs_status(id, STARTED);
- fprintf(p_events_log, log_received_all_started_fmt, id);
- fflush(p_events_log);
- printf(log_received_all_started_fmt, id);
- }
- void done_work(int id) {
- fprintf(p_events_log, log_done_fmt, id);
- fflush(p_events_log);
- printf(log_done_fmt, id);
- Message msg;
- sprintf(msg.s_payload, log_done_fmt, id);
- msg.s_header.s_magic = MESSAGE_MAGIC;
- msg.s_header.s_local_time = time(NULL);
- msg.s_header.s_type = DONE;
- msg.s_header.s_payload_len = strlen(msg.s_payload);
- send_multicast(&procs[id], &msg);
- wait_all_procs_status(id, DONE);
- fprintf(p_events_log, log_received_all_done_fmt, id);
- fflush(p_events_log);
- printf(log_received_all_done_fmt, id);
- }
- void exit_work() {
- for (int i = 0; i < proc_cnt; i++) {
- for (int j = 0; j < proc_cnt; j++) {
- if (i == j) continue;
- int tmp_read = procs[i].pipes[j].read;
- int tmp_write = procs[i].pipes[j].write;
- close(procs[i].pipes[j].read);
- fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
- fflush(p_pipes_log);
- close(procs[i].pipes[j].write);
- fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
- fflush(p_pipes_log);
- }
- }
- free(procs);
- fclose(p_events_log);
- fclose(p_pipes_log);
- }
- int main(int argc, const char *argv[]) {
- if (argc < 3 || strcmp(argv[1], "-p") != 0) {
- printf("use key -p <N> to set the number of child processes");
- return 1;
- }
- main_proc_pid = getpid();
- proc_cnt = atoi(argv[2]) + 1;
- procs = (proc *) malloc(sizeof(proc) * proc_cnt);
- prepare_logs();
- for (int i = 0; i < proc_cnt; i++) {
- proc tmp_proc;
- tmp_proc.pipes = (pipe_ends *) malloc(sizeof(pipe_ends) * proc_cnt);
- for (int j = 0; j < proc_cnt; j++) {
- if (i == j) continue;
- int tmp_pipe[2];
- pipe(tmp_pipe);
- tmp_proc.pipes[j].read = tmp_pipe[0];
- tmp_proc.pipes[j].write = tmp_pipe[1];
- fprintf(p_pipes_log, "CREATED pipe FOR procs %d -> %d (READ: %d WRITE: %d)\n", i, j, tmp_proc.pipes[j].read,
- tmp_proc.pipes[j].write);
- fflush(p_pipes_log);
- }
- procs[i] = tmp_proc;
- }
- procs[0].pid = main_proc_pid;
- procs[0].id = PARENT_ID;
- pid_t pid = getppid();
- for (int i = 1; i < proc_cnt; i++) {
- if ((pid = fork()) == 0) {
- procs[i].pid = getpid();
- procs[i].id = i;
- proc_work(i);
- break;
- }
- }
- if (pid != 0) {
- wait_all_procs_status(PARENT_ID, STARTED);
- fprintf(p_events_log, log_received_all_started_fmt, PARENT_ID);
- fflush(p_events_log);
- printf(log_received_all_started_fmt, PARENT_ID);
- wait_all_procs_status(PARENT_ID, DONE);
- fprintf(p_events_log, log_received_all_done_fmt, PARENT_ID);
- fflush(p_events_log);
- printf(log_received_all_done_fmt, PARENT_ID);
- while (--proc_cnt)
- wait(NULL);
- // printf("All childs terminated!\n");
- exit_work();
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement