Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <errno.h>
- #include "ipc.h"
- #include "utils/logger.c"
- #include "pa2345.h"
- #define max(a, b) ((a) > (b) ? (a) : (b))
- Message build_message(int msg_type, char *payload, timestamp_t lamport_time) {
- MessageHeader msg_h;
- msg_h.s_magic = MESSAGE_MAGIC;
- size_t L = strlen(payload);
- if (L > MAX_PAYLOAD_LEN) L = MAX_PAYLOAD_LEN;
- msg_h.s_payload_len = (uint16_t)L;
- msg_h.s_type = msg_type;
- msg_h.s_local_time = lamport_time;
- Message msg;
- msg.s_header = msg_h;
- if (L) memcpy(msg.s_payload, payload, L);
- if (L < MAX_PAYLOAD_LEN) msg.s_payload[L] = '\0';
- return msg;
- }
- int send(void * self, local_id dst, const Message * msg) {
- struct IPCContext *ctx = self;
- ctx->lamport_time++;
- Message msg_to_send = *msg;
- msg_to_send.s_header.s_local_time = ctx->lamport_time;
- msg_to_send.s_header.s_sender = ctx->id;
- size_t len = sizeof(MessageHeader) + msg_to_send.s_header.s_payload_len;
- ssize_t res = write(pipes[ctx->id][dst][1], &msg_to_send, len);
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- return -1;
- }
- return (res == -1) ? -1 : 0;
- }
- int receive(void * self, local_id from, Message * msg) {
- struct IPCContext *ctx = self;
- ssize_t res = read(pipes[from][ctx->id][0], msg, sizeof(Message));
- if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- return -1;
- }
- if (res <= 0) return -1;
- ctx->lamport_time = max(ctx->lamport_time, msg->s_header.s_local_time) + 1;
- return 0;
- }
- int send_multicast(void * self, const Message * msg) {
- struct IPCContext *ctx = self;
- int result = 0;
- for (int i = 0; i < ctx->proc_count; i++) {
- if (i == ctx->id) continue;
- if (send(self, i, msg) != 0) result = -1;
- }
- return result;
- }
- int receive_any(void * self, Message * msg) {
- struct IPCContext *ctx = self;
- for (int i = 0; i < ctx->proc_count; i++) {
- if (i == ctx->id) continue;
- if (receive(self, i, msg) == 0) {
- return 0;
- }
- }
- return -1;
- }
- int handle_request(void * self, Message * msg) {
- struct IPCContext *ctx = self;
- int sender = msg->s_header.s_sender;
- timestamp_t their_time = msg->s_header.s_local_time;
- timestamp_t our_time = ctx->request_time;
- if (ctx->in_cs ||
- (ctx->requesting &&
- ((our_time < their_time) ||
- (our_time == their_time && ctx->id < sender)))) {
- ctx->deferred[sender] = true;
- } else {
- Message msg_reply_cs = build_message(CS_REPLY, "", ctx->lamport_time);
- send(ctx, sender, &msg_reply_cs);
- }
- return 0;
- }
- int handle_reply(void * self, Message * msg) {
- struct IPCContext *ctx = self;
- int sender = msg->s_header.s_sender;
- ctx->replied[sender] = true;
- return 0;
- }
- bool all_replied(void * self) {
- struct IPCContext *ctx = self;
- for (int i = 0; i < ctx->proc_count; i++) {
- if (i == ctx->id) continue;
- if (!ctx->replied[i]) return false;
- }
- return true;
- }
- int request_cs(const void * self) {
- struct IPCContext *ctx = (struct IPCContext *)self;
- ctx->lamport_time++;
- ctx->requesting = true;
- for (int i = 0; i < ctx->proc_count; i++) {
- ctx->replied[i] = (i == ctx->id);
- }
- ctx->request_time = ctx->lamport_time;
- Message msg_request_cs = build_message(CS_REQUEST, "", ctx->lamport_time);
- send_multicast(ctx, &msg_request_cs);
- return 0;
- }
- int release_cs(const void * self) {
- struct IPCContext *ctx = (struct IPCContext *)self;
- ctx->lamport_time++;
- ctx->in_cs = false;
- ctx->requesting = false;
- for (int i = 0; i < ctx->proc_count; i++) {
- if (ctx->deferred[i]) {
- Message msg_reply_cs = build_message(CS_REPLY, "", ctx->lamport_time);
- send(ctx, i, &msg_reply_cs);
- ctx->deferred[i] = false;
- }
- ctx->replied[i] = false;
- }
- return 0;
- }
- void sync_started(IPCContext *ctx) {
- Message msg_started = build_message(STARTED, "", ctx->lamport_time);
- send_multicast(ctx, &msg_started);
- while (ctx->started_count < ctx->proc_count - 1) {
- Message msg;
- if (receive_any(ctx, &msg) == 0 && msg.s_header.s_type == STARTED) {
- ctx->started_count++;
- }
- }
- ctx->state = 0;
- receive_log(ctx->state, ctx->id, ctx->parent_id);
- }
- void sync_done(IPCContext *ctx) {
- Message msg_done = build_message(DONE, "", ctx->lamport_time);
- send_multicast(ctx, &msg_done);
- while (ctx->done_count < ctx->proc_count - 1) {
- Message msg;
- if (receive_any(ctx, &msg) == 0 && msg.s_header.s_type == DONE) {
- ctx->done_count++;
- }
- }
- ctx->state = 1;
- receive_log(ctx->state, ctx->id, ctx->parent_id);
- }
- int child_work(void * self, bool enable_mutexl) {
- struct IPCContext *ctx = self;
- receive_log(ctx->state, ctx->id, ctx->parent_id);
- Message msg;
- sync_started(ctx);
- int N = (ctx->id + 1) * 5; // фикс, чтобы при id=0 было >0 итераций
- int done_iterations = 0;
- while (done_iterations < N) {
- if (receive_any(ctx, &msg) == 0) {
- switch (msg.s_header.s_type) {
- case CS_REQUEST: handle_request(ctx, &msg); break;
- case CS_REPLY: handle_reply(ctx, &msg); break;
- case DONE: ctx->done_count++; break;
- default: break;
- }
- }
- if (enable_mutexl) {
- if (!ctx->requesting && !ctx->in_cs) {
- request_cs(ctx);
- }
- if (ctx->requesting && all_replied(ctx)) {
- ctx->in_cs = true;
- char logmsg[128];
- snprintf(logmsg, sizeof(logmsg),
- "Process %d (Lamport=%d) entered critical section\n",
- ctx->id, ctx->lamport_time);
- print(logmsg);
- release_cs(ctx);
- done_iterations++;
- }
- } else {
- char logmsg[128];
- snprintf(logmsg, sizeof(logmsg),
- "Process %d (Lamport=%d) entered critical section\n",
- ctx->id, ctx->lamport_time);
- print(logmsg);
- done_iterations++;
- }
- }
- sync_done(ctx);
- ctx->state = 3;
- receive_log(ctx->state, ctx->id, ctx->parent_id);
- return 0;
- }
- int ipc_init(int proc_count, bool enable_mutexl) {
- if (proc_count > MAX_PROCESS_ID) {
- return -1;
- }
- // создаем пайпы между всеми парами (0-based id)
- for (int i = 0; i < proc_count; i++) {
- for (int j = 0; j < proc_count; j++) {
- if (i == j) continue;
- if (pipe(pipes[i][j]) == -1) {
- return -1;
- }
- fcntl(pipes[i][j][0], F_SETFL, O_NONBLOCK);
- fcntl(pipes[i][j][1], F_SETFL, O_NONBLOCK);
- }
- }
- for (int i = 0; i < proc_count; i++) {
- pid_t new_process = fork();
- if (new_process == 0) {
- local_id self_id = i; // фикс: 0-based ID
- // закрыть лишние концы пайпов
- for (int from = 0; from < proc_count; from++) {
- for (int to = 0; to < proc_count; to++) {
- if (from == to) continue;
- if (from == self_id) {
- close(pipes[from][to][0]); // оставляем write-end
- } else if (to == self_id) {
- close(pipes[from][to][1]); // оставляем read-end
- } else {
- close(pipes[from][to][0]);
- close(pipes[from][to][1]);
- }
- }
- }
- IPCContext ctx;
- ctx.state = 0;
- ctx.proc_count = proc_count; // только дети 0..proc_count-1
- ctx.id = self_id;
- ctx.parent_id = 0;
- ctx.lamport_time = 0;
- ctx.started_count = 0;
- ctx.done_count = 0;
- ctx.requesting = false;
- ctx.in_cs = false;
- ctx.request_time = 0;
- memset(ctx.replied, 0, sizeof(ctx.replied));
- memset(ctx.deferred, 0, sizeof(ctx.deferred));
- child_work(&ctx, enable_mutexl);
- _exit(0);
- }
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment