Advertisement
Guest User

Untitled

a guest
May 19th, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.34 KB | None | 0 0
  1. //
  2. // main.c
  3. // distr
  4. //
  5. // Created by Алексей Пулич on 18.05.17.
  6. // Copyright © 2017 Алексей Пулич. All rights reserved.
  7. //
  8.  
  9. #include <stdio.h>
  10. #include <string.h>
  11. #include <stdlib.h>
  12. #include <unistd.h>
  13. #include <time.h>
  14. #include <sys/types.h>
  15. #include <sys/wait.h>
  16. //#include <sys/stat.h>
  17. //#include <fcntl.h>
  18.  
  19. #include "common.h"
  20. #include "ipc.h"
  21. #include "pa1.h"
  22.  
  23. typedef struct {
  24. int read;
  25. int write;
  26. } pipe_ends;
  27.  
  28. typedef struct {
  29. int id;
  30. pid_t pid;
  31. pipe_ends *pipes;
  32. } proc;
  33.  
  34. proc *procs;
  35. int procId;
  36. int proc_cnt;
  37.  
  38. pid_t main_proc_pid;
  39.  
  40. FILE *p_events_log;
  41. FILE *p_pipes_log;
  42.  
  43. void proc_work(int id);
  44.  
  45. void prepare_logs();
  46.  
  47. void start_work(int id);
  48.  
  49. void done_work(int id);
  50.  
  51. void wait_all_procs_status(int id, int status);
  52.  
  53. void exit_work();
  54.  
  55. void close_pipes(int id);
  56.  
  57. //////////////////
  58. //send - receive//
  59. //////////////////
  60.  
  61. int receive(void *self, local_id from, Message *msg) {
  62. proc *proc_tmp = (proc *) self;
  63. if (read(procs[from].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
  64. printf("failed to receive data");
  65. return 2;
  66. }
  67. if (read(procs[from].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
  68. printf("failed to receive data");
  69. return 2;
  70. }
  71. return 0;
  72. }
  73.  
  74. int receive_any(void *self, Message *msg) {
  75. proc *proc_tmp = (proc *) self;
  76. for (int i = 0; i < proc_cnt; i++) {
  77. if (proc_tmp->id == i) continue;
  78.  
  79. if (read(procs[i].pipes[proc_tmp->id].read, &msg->s_header, sizeof(MessageHeader)) == -1) {
  80. printf("failed to receive data");
  81. return 2;
  82. }
  83. if (read(procs[i].pipes[proc_tmp->id].read, msg->s_payload, msg->s_header.s_payload_len) == -1) {
  84. printf("failed to receive data");
  85. return 2;
  86. }
  87. break;
  88. }
  89. return 0;
  90. }
  91.  
  92.  
  93. int send(void *self, local_id dst, const Message *msg) {
  94. proc *proc_tmp = (proc *) self;
  95. if (write(proc_tmp->pipes[dst].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
  96. msg->s_header.s_payload_len + sizeof(MessageHeader)) {
  97. printf("failed to send data");
  98. return 3;
  99. }
  100.  
  101. return 0;
  102. }
  103.  
  104. int send_multicast(void *self, const Message *msg) {
  105. proc *proc_tmp = (proc *) self;
  106.  
  107. for (int i = 0; i < proc_cnt; i++) {
  108.  
  109. if (i == proc_tmp->id)
  110. continue;
  111.  
  112. if (write(proc_tmp->pipes[i].write, msg, msg->s_header.s_payload_len + sizeof(MessageHeader)) !=
  113. msg->s_header.s_payload_len + sizeof(MessageHeader)) {
  114. printf("failed to send data");
  115. return 3;
  116. }
  117.  
  118. // printf("SEND MULTICAST MESSAGE <%s> (FROM %d TO %d)\n", msg->s_payload, proc_tmp->id, i);
  119. }
  120.  
  121. return 0;
  122. }
  123.  
  124. //////////////////////
  125. //END send - receive//
  126. //////////////////////
  127.  
  128. void prepare_logs() {
  129. p_events_log = fopen(events_log, "w+");
  130. p_pipes_log = fopen(pipes_log, "w+");
  131. }
  132.  
  133. void wait_all_procs_status(int id, int status) {
  134. Message msg;
  135. for (int i = 1; i < proc_cnt; i++) {
  136. if (i == id) continue;
  137. do {
  138. receive(&procs[id], i, &msg);
  139. } while (msg.s_header.s_type != status);
  140. // printf("RECEIVE MSG <%s> (FROM %d to %d)\n", msg.s_payload, i, id);
  141. }
  142. }
  143.  
  144. void close_pipes(int id) {
  145. for (int i = 1; i < proc_cnt; i++) {
  146. for (int j = 0; j < proc_cnt; j++) {
  147. int tmp_read = procs[i].pipes[j].read;
  148. int tmp_write = procs[i].pipes[j].write;
  149. if(i==j)
  150. continue;
  151. if (i == id) {
  152. close(procs[i].pipes[j].read);
  153. fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  154. }
  155. else if (j == id) {
  156. close(procs[i].pipes[j].write);
  157. fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  158. }
  159. else {
  160. if(j==procs[i].id) continue;
  161. close(procs[i].pipes[j].read);
  162. close(procs[i].pipes[j].write);
  163. fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  164. fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  165. }
  166. }
  167. }
  168. }
  169.  
  170. //child-process life-cycle
  171. void proc_work(int id) {
  172. close_pipes(id);
  173.  
  174. start_work(id);
  175.  
  176. //Useful work
  177.  
  178. done_work(id);
  179. }
  180.  
  181. void start_work(int id) {
  182. fprintf(p_events_log, log_started_fmt, id, getpid(), getppid());
  183. fflush(p_events_log);
  184. printf(log_started_fmt, id, getpid(), getppid());
  185.  
  186.  
  187. Message msg;
  188. sprintf(msg.s_payload, log_started_fmt, id, getpid(), getppid());
  189. msg.s_header.s_magic = MESSAGE_MAGIC;
  190. msg.s_header.s_local_time = time(NULL);
  191. msg.s_header.s_type = STARTED;
  192. msg.s_header.s_payload_len = strlen(msg.s_payload);
  193.  
  194. send_multicast(&procs[id], &msg);
  195. wait_all_procs_status(id, STARTED);
  196. fprintf(p_events_log, log_received_all_started_fmt, id);
  197. fflush(p_events_log);
  198. printf(log_received_all_started_fmt, id);
  199. }
  200.  
  201. void done_work(int id) {
  202. fprintf(p_events_log, log_done_fmt, id);
  203. fflush(p_events_log);
  204. printf(log_done_fmt, id);
  205.  
  206. Message msg;
  207. sprintf(msg.s_payload, log_done_fmt, id);
  208. msg.s_header.s_magic = MESSAGE_MAGIC;
  209. msg.s_header.s_local_time = time(NULL);
  210. msg.s_header.s_type = DONE;
  211. msg.s_header.s_payload_len = strlen(msg.s_payload);
  212.  
  213. send_multicast(&procs[id], &msg);
  214. wait_all_procs_status(id, DONE);
  215. fprintf(p_events_log, log_received_all_done_fmt, id);
  216. fflush(p_events_log);
  217. printf(log_received_all_done_fmt, id);
  218. }
  219.  
  220. void exit_work() {
  221. for (int i = 0; i < proc_cnt; i++) {
  222. for (int j = 0; j < proc_cnt; j++) {
  223. if (i == j) continue;
  224. int tmp_read = procs[i].pipes[j].read;
  225. int tmp_write = procs[i].pipes[j].write;
  226. close(procs[i].pipes[j].read);
  227. fprintf(p_pipes_log, "CLOSED READ-end (%d) pipe FOR procs %d -> %d\n", tmp_read, i, j);
  228. fflush(p_pipes_log);
  229. close(procs[i].pipes[j].write);
  230. fprintf(p_pipes_log, "CLOSED WRITE-end (%d) pipe FOR procs %d -> %d\n", tmp_write, i, j);
  231. fflush(p_pipes_log);
  232. }
  233. }
  234. free(procs);
  235. fclose(p_events_log);
  236. fclose(p_pipes_log);
  237. }
  238.  
  239. int main(int argc, const char *argv[]) {
  240.  
  241. if (argc < 3 || strcmp(argv[1], "-p") != 0) {
  242. printf("use key -p <N> to set the number of child processes");
  243. return 1;
  244. }
  245.  
  246. main_proc_pid = getpid();
  247.  
  248. proc_cnt = atoi(argv[2]) + 1;
  249.  
  250. procs = (proc *) malloc(sizeof(proc) * proc_cnt);
  251.  
  252. prepare_logs();
  253.  
  254. for (int i = 0; i < proc_cnt; i++) {
  255. proc tmp_proc;
  256. tmp_proc.pipes = (pipe_ends *) malloc(sizeof(pipe_ends) * proc_cnt);
  257.  
  258. for (int j = 0; j < proc_cnt; j++) {
  259. if (i == j) continue;
  260. int tmp_pipe[2];
  261. pipe(tmp_pipe);
  262. tmp_proc.pipes[j].read = tmp_pipe[0];
  263. tmp_proc.pipes[j].write = tmp_pipe[1];
  264. fprintf(p_pipes_log, "CREATED pipe FOR procs %d -> %d (READ: %d WRITE: %d)\n", i, j, tmp_proc.pipes[j].read,
  265. tmp_proc.pipes[j].write);
  266. fflush(p_pipes_log);
  267. }
  268. procs[i] = tmp_proc;
  269. }
  270.  
  271. procs[0].pid = main_proc_pid;
  272. procs[0].id = PARENT_ID;
  273. pid_t pid = getppid();
  274. for (int i = 1; i < proc_cnt; i++) {
  275. if ((pid = fork()) == 0) {
  276. procs[i].pid = getpid();
  277. procs[i].id = i;
  278. proc_work(i);
  279. break;
  280. }
  281. }
  282. if (pid != 0) {
  283. wait_all_procs_status(PARENT_ID, STARTED);
  284. fprintf(p_events_log, log_received_all_started_fmt, PARENT_ID);
  285. fflush(p_events_log);
  286. printf(log_received_all_started_fmt, PARENT_ID);
  287.  
  288. wait_all_procs_status(PARENT_ID, DONE);
  289. fprintf(p_events_log, log_received_all_done_fmt, PARENT_ID);
  290. fflush(p_events_log);
  291. printf(log_received_all_done_fmt, PARENT_ID);
  292.  
  293. while (--proc_cnt)
  294. wait(NULL);
  295. // printf("All childs terminated!\n");
  296. exit_work();
  297. }
  298.  
  299. return 0;
  300. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement