Falexom

Untitled

Oct 14th, 2025
14
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.56 KB | None | 0 0
  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <unistd.h>
  4. #include <fcntl.h>
  5. #include <errno.h>
  6.  
  7. #include "ipc.h"
  8. #include "utils/logger.c"
  9. #include "pa2345.h"
  10.  
  11. #define max(a, b) ((a) > (b) ? (a) : (b))
  12.  
  13. Message build_message(int msg_type, char *payload, timestamp_t lamport_time) {
  14. MessageHeader msg_h;
  15. msg_h.s_magic = MESSAGE_MAGIC;
  16.  
  17. size_t L = strlen(payload);
  18. if (L > MAX_PAYLOAD_LEN) L = MAX_PAYLOAD_LEN;
  19. msg_h.s_payload_len = (uint16_t)L;
  20.  
  21. msg_h.s_type = msg_type;
  22. msg_h.s_local_time = lamport_time;
  23.  
  24. Message msg;
  25. msg.s_header = msg_h;
  26. if (L) memcpy(msg.s_payload, payload, L);
  27. if (L < MAX_PAYLOAD_LEN) msg.s_payload[L] = '\0';
  28. return msg;
  29. }
  30.  
  31. int send(void * self, local_id dst, const Message * msg) {
  32. struct IPCContext *ctx = self;
  33. ctx->lamport_time++;
  34. Message msg_to_send = *msg;
  35. msg_to_send.s_header.s_local_time = ctx->lamport_time;
  36. msg_to_send.s_header.s_sender = ctx->id;
  37.  
  38. size_t len = sizeof(MessageHeader) + msg_to_send.s_header.s_payload_len;
  39. ssize_t res = write(pipes[ctx->id][dst][1], &msg_to_send, len);
  40.  
  41. if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  42. return -1;
  43. }
  44. return (res == -1) ? -1 : 0;
  45. }
  46.  
  47. int receive(void * self, local_id from, Message * msg) {
  48. struct IPCContext *ctx = self;
  49. ssize_t res = read(pipes[from][ctx->id][0], msg, sizeof(Message));
  50. if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  51. return -1;
  52. }
  53. if (res <= 0) return -1;
  54.  
  55. ctx->lamport_time = max(ctx->lamport_time, msg->s_header.s_local_time) + 1;
  56. return 0;
  57. }
  58.  
  59. int send_multicast(void * self, const Message * msg) {
  60. struct IPCContext *ctx = self;
  61. int result = 0;
  62. for (int i = 0; i < ctx->proc_count; i++) {
  63. if (i == ctx->id) continue;
  64. if (send(self, i, msg) != 0) result = -1;
  65. }
  66. return result;
  67. }
  68.  
  69. int receive_any(void * self, Message * msg) {
  70. struct IPCContext *ctx = self;
  71. for (int i = 0; i < ctx->proc_count; i++) {
  72. if (i == ctx->id) continue;
  73. if (receive(self, i, msg) == 0) {
  74. return 0;
  75. }
  76. }
  77. return -1;
  78. }
  79.  
  80. int handle_request(void * self, Message * msg) {
  81. struct IPCContext *ctx = self;
  82. int sender = msg->s_header.s_sender;
  83. timestamp_t their_time = msg->s_header.s_local_time;
  84. timestamp_t our_time = ctx->request_time;
  85.  
  86. if (ctx->in_cs ||
  87. (ctx->requesting &&
  88. ((our_time < their_time) ||
  89. (our_time == their_time && ctx->id < sender)))) {
  90.  
  91. ctx->deferred[sender] = true;
  92. } else {
  93. Message msg_reply_cs = build_message(CS_REPLY, "", ctx->lamport_time);
  94. send(ctx, sender, &msg_reply_cs);
  95. }
  96. return 0;
  97. }
  98.  
  99. int handle_reply(void * self, Message * msg) {
  100. struct IPCContext *ctx = self;
  101. int sender = msg->s_header.s_sender;
  102. ctx->replied[sender] = true;
  103. return 0;
  104. }
  105.  
  106. bool all_replied(void * self) {
  107. struct IPCContext *ctx = self;
  108. for (int i = 0; i < ctx->proc_count; i++) {
  109. if (i == ctx->id) continue;
  110. if (!ctx->replied[i]) return false;
  111. }
  112. return true;
  113. }
  114.  
  115. int request_cs(const void * self) {
  116. struct IPCContext *ctx = (struct IPCContext *)self;
  117. ctx->lamport_time++;
  118. ctx->requesting = true;
  119.  
  120. for (int i = 0; i < ctx->proc_count; i++) {
  121. ctx->replied[i] = (i == ctx->id);
  122. }
  123.  
  124. ctx->request_time = ctx->lamport_time;
  125. Message msg_request_cs = build_message(CS_REQUEST, "", ctx->lamport_time);
  126. send_multicast(ctx, &msg_request_cs);
  127. return 0;
  128. }
  129.  
  130. int release_cs(const void * self) {
  131. struct IPCContext *ctx = (struct IPCContext *)self;
  132.  
  133. ctx->lamport_time++;
  134. ctx->in_cs = false;
  135. ctx->requesting = false;
  136.  
  137. for (int i = 0; i < ctx->proc_count; i++) {
  138. if (ctx->deferred[i]) {
  139. Message msg_reply_cs = build_message(CS_REPLY, "", ctx->lamport_time);
  140. send(ctx, i, &msg_reply_cs);
  141. ctx->deferred[i] = false;
  142. }
  143. ctx->replied[i] = false;
  144. }
  145. return 0;
  146. }
  147.  
  148. void sync_started(IPCContext *ctx) {
  149. Message msg_started = build_message(STARTED, "", ctx->lamport_time);
  150. send_multicast(ctx, &msg_started);
  151.  
  152. while (ctx->started_count < ctx->proc_count - 1) {
  153. Message msg;
  154. if (receive_any(ctx, &msg) == 0 && msg.s_header.s_type == STARTED) {
  155. ctx->started_count++;
  156. }
  157. }
  158.  
  159. ctx->state = 0;
  160. receive_log(ctx->state, ctx->id, ctx->parent_id);
  161. }
  162.  
  163. void sync_done(IPCContext *ctx) {
  164. Message msg_done = build_message(DONE, "", ctx->lamport_time);
  165. send_multicast(ctx, &msg_done);
  166.  
  167. while (ctx->done_count < ctx->proc_count - 1) {
  168. Message msg;
  169. if (receive_any(ctx, &msg) == 0 && msg.s_header.s_type == DONE) {
  170. ctx->done_count++;
  171. }
  172. }
  173.  
  174. ctx->state = 1;
  175. receive_log(ctx->state, ctx->id, ctx->parent_id);
  176. }
  177.  
  178. int child_work(void * self, bool enable_mutexl) {
  179. struct IPCContext *ctx = self;
  180. receive_log(ctx->state, ctx->id, ctx->parent_id);
  181. Message msg;
  182.  
  183. sync_started(ctx);
  184.  
  185. int N = (ctx->id + 1) * 5; // фикс, чтобы при id=0 было >0 итераций
  186. int done_iterations = 0;
  187.  
  188. while (done_iterations < N) {
  189. if (receive_any(ctx, &msg) == 0) {
  190. switch (msg.s_header.s_type) {
  191. case CS_REQUEST: handle_request(ctx, &msg); break;
  192. case CS_REPLY: handle_reply(ctx, &msg); break;
  193. case DONE: ctx->done_count++; break;
  194. default: break;
  195. }
  196. }
  197.  
  198. if (enable_mutexl) {
  199. if (!ctx->requesting && !ctx->in_cs) {
  200. request_cs(ctx);
  201. }
  202. if (ctx->requesting && all_replied(ctx)) {
  203. ctx->in_cs = true;
  204. char logmsg[128];
  205. snprintf(logmsg, sizeof(logmsg),
  206. "Process %d (Lamport=%d) entered critical section\n",
  207. ctx->id, ctx->lamport_time);
  208. print(logmsg);
  209. release_cs(ctx);
  210. done_iterations++;
  211. }
  212. } else {
  213. char logmsg[128];
  214. snprintf(logmsg, sizeof(logmsg),
  215. "Process %d (Lamport=%d) entered critical section\n",
  216. ctx->id, ctx->lamport_time);
  217. print(logmsg);
  218. done_iterations++;
  219. }
  220. }
  221.  
  222. sync_done(ctx);
  223. ctx->state = 3;
  224. receive_log(ctx->state, ctx->id, ctx->parent_id);
  225. return 0;
  226. }
  227.  
  228. int ipc_init(int proc_count, bool enable_mutexl) {
  229. if (proc_count > MAX_PROCESS_ID) {
  230. return -1;
  231. }
  232.  
  233. // создаем пайпы между всеми парами (0-based id)
  234. for (int i = 0; i < proc_count; i++) {
  235. for (int j = 0; j < proc_count; j++) {
  236. if (i == j) continue;
  237. if (pipe(pipes[i][j]) == -1) {
  238. return -1;
  239. }
  240. fcntl(pipes[i][j][0], F_SETFL, O_NONBLOCK);
  241. fcntl(pipes[i][j][1], F_SETFL, O_NONBLOCK);
  242. }
  243. }
  244.  
  245. for (int i = 0; i < proc_count; i++) {
  246. pid_t new_process = fork();
  247. if (new_process == 0) {
  248. local_id self_id = i; // фикс: 0-based ID
  249.  
  250. // закрыть лишние концы пайпов
  251. for (int from = 0; from < proc_count; from++) {
  252. for (int to = 0; to < proc_count; to++) {
  253. if (from == to) continue;
  254.  
  255. if (from == self_id) {
  256. close(pipes[from][to][0]); // оставляем write-end
  257. } else if (to == self_id) {
  258. close(pipes[from][to][1]); // оставляем read-end
  259. } else {
  260. close(pipes[from][to][0]);
  261. close(pipes[from][to][1]);
  262. }
  263. }
  264. }
  265.  
  266. IPCContext ctx;
  267. ctx.state = 0;
  268. ctx.proc_count = proc_count; // только дети 0..proc_count-1
  269. ctx.id = self_id;
  270. ctx.parent_id = 0;
  271. ctx.lamport_time = 0;
  272. ctx.started_count = 0;
  273. ctx.done_count = 0;
  274. ctx.requesting = false;
  275. ctx.in_cs = false;
  276. ctx.request_time = 0;
  277. memset(ctx.replied, 0, sizeof(ctx.replied));
  278. memset(ctx.deferred, 0, sizeof(ctx.deferred));
  279.  
  280. child_work(&ctx, enable_mutexl);
  281. _exit(0);
  282. }
  283. }
  284. return 0;
  285. }
Advertisement
Add Comment
Please, Sign In to add comment