Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <string.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <stddef.h>
- #include <assert.h>
- #include <poll.h>
- #include <errno.h>
- #include <time.h>
- #include <sys/time.h>
- #include <sys/socket.h>
- #include <sys/uio.h>
- #include <netinet/in.h>
- #include "rlib.h"
- #include "buffer.h"
- #define miliseconds(n) 1000000 * n
- static int checks(rel_t *r, packet_t *pkt, size_t n);
- static void process_ACK_PKG(rel_t *r, packet_t *pkt, size_t n);
- static void process_DATA_PKG(rel_t *r, packet_t *pkt, size_t n);
- static int is_duplicate(rel_t *r, packet_t *pkt);
- static long now_ms();
- static int get_send_window_size(rel_t *r);
- static void send_ACK_PKG(rel_t *r, uint32_t ack);
- static packet_t *create_DATA_PKG(rel_t *r);
- struct reliable_state {
- rel_t *next; /* Linked list for traversing all connections */
- rel_t **prev;
- conn_t *c; /* This is the connection object */
- size_t window_size;
- uint32_t next_seq;
- int timeout;
- uint32_t lowest_ackno;
- uint32_t send_nxt;
- int send_eof;
- int send_eof_acked;
- int rec_eof;
- uint32_t next_received;
- //buffers
- buffer_t* send_buffer;
- buffer_t* rec_buffer;
- };
- rel_t *rel_list;
- // **********************Static helper functions Start***************
- static int get_send_window_size(rel_t *r){
- return r->send_nxt - r->lowest_ackno;
- }
- static long now_ms() {
- struct timeval now;
- gettimeofday(&now, NULL);
- return now.tv_sec * 1000 + now.tv_usec / 1000;
- }
- static int is_duplicate(rel_t *r, packet_t *pkt){
- return ntohl(pkt->seqno) < r->next_received;
- }
- static int checks(rel_t *r, packet_t *pkt, size_t n){
- int rcv_check = pkt->cksum;
- pkt->cksum = 0;
- if(ntohs(pkt->len)!= n){
- return -1;
- }
- if(rcv_check != cksum(pkt,n)){
- return -1;
- }
- pkt->cksum = rcv_check;
- return 0;
- }
- static void send_ACK_PKG(rel_t *r, uint32_t ack){
- struct ack_packet *a = xmalloc(sizeof(struct ack_packet));
- a->cksum = 0;
- a->len = htons(8);
- a->ackno = htonl(ack);
- a->cksum = cksum(a,8);
- conn_sendpkt(r->c, (packet_t *)a, 8);
- free(a);
- }
- static void process_ACK_PKG(rel_t *r, packet_t *pkt, size_t n){
- if(buffer_size(r->send_buffer)==0) return;
- if(ntohl(pkt->ackno)>r->lowest_ackno){ // send una?
- r->lowest_ackno = ntohl(pkt->ackno);
- buffer_remove(r->send_buffer, r->lowest_ackno);
- }
- if(r->send_eof){ // send eof?
- if(buffer_size(r->send_buffer)==0) r->send_eof_acked = 1;
- }
- if (r->send_eof_acked && r->rec_eof && buffer_size(r->send_buffer)==0){
- rel_destroy(r);
- }else if(get_send_window_size(r) < r->window_size){
- rel_read(r);
- }
- return;
- }
- static packet_t *create_DATA_PKG(rel_t *r){ //TODO
- packet_t *pkt;
- pkt = xmalloc(sizeof(packet_t));
- int input = conn_input(r->c, pkt->data, 500);
- if (input == 0) {
- free(pkt);
- return NULL;
- }
- pkt->cksum = 0;
- pkt->len = htons(12 + ((input == -1) ? 0 : input));
- pkt->ackno = htonl(r->next_received);
- pkt->seqno = htonl(r->send_nxt);
- pkt->cksum = cksum(pkt, ntohs(pkt->len));
- return pkt;
- }
- static void process_DATA_PKG(rel_t *r, packet_t *pkt, size_t n){
- if (is_duplicate(r, pkt)) {
- send_ACK_PKG(r, ntohl(pkt->seqno) + 1);
- return;
- }
- if (r->rec_eof) {
- if (r->send_eof_acked) rel_destroy(r);
- return;
- }
- // ignore out-of-window packets
- if (ntohl(pkt->seqno) >= r->next_received + r->window_size) return;
- if (buffer_size(r->rec_buffer) == r->window_size) {
- rel_output(r);
- }
- // store in buffer if not already there
- if (!buffer_contains(r->rec_buffer, ntohl(pkt->seqno))) {
- buffer_insert(r->rec_buffer, pkt, now_ms());
- }
- rel_output(r);
- }
- // **********************Static helper functions END***************
- rel_t *
- rel_create (conn_t *c, const struct sockaddr_storage *ss,
- const struct config_common *cc)
- {
- rel_t *r;
- r = xmalloc (sizeof (*r));
- memset (r, 0, sizeof (*r));
- if (!c) {
- c = conn_create (r, ss);
- if (!c) {
- free (r);
- return NULL;
- }
- }
- r->c = c;
- r->next = rel_list;
- r->prev = &rel_list;
- if (rel_list)
- rel_list->prev = &r->next;
- rel_list = r;
- r->send_buffer = xmalloc(sizeof(buffer_t));
- r->send_buffer->head = NULL;
- r->lowest_ackno = 1;
- r->send_nxt = 1;
- r->send_eof = 0;
- r->send_eof_acked = 0;
- // receive
- r->rec_buffer = xmalloc(sizeof(buffer_t));
- r->rec_buffer->head = NULL;
- r->next_received = 1;
- r->rec_eof = 0;
- r->window_size = cc->window;
- r->timeout = cc->timeout;
- return r;
- }
- void
- rel_destroy (rel_t *r)
- {
- if (r->next) {
- r->next->prev = r->prev;
- }
- *r->prev = r->next;
- conn_destroy (r->c);
- buffer_clear(r->send_buffer);
- free(r->send_buffer);
- buffer_clear(r->rec_buffer);
- free(r->rec_buffer);
- free(r);
- }
- void
- rel_recvpkt (rel_t *r, packet_t *pkt, size_t n)
- {
- if(checks(r,pkt,n)==-1) return;
- // if its only an ACK packet
- if(n==8){
- process_ACK_PKG(r, pkt, n);
- }else{
- process_DATA_PKG(r, pkt, n);
- }
- }
- void
- rel_read (rel_t *s)
- {
- /*
- char buffer[s->window_size];
- int size = conn_input(s->c,buffer,conn_bufspace(s->c));
- if(size==0||size==-1){
- return NULL;
- }
- packet_t *pkt = malloc(sizeof(*pkt));
- // first only process packets which fit into the packet
- int off = 0;
- if(size < 500){
- build_packet(pkt, size + 12);
- strncpy(pkt->data, &buffer[0],size); //TODO change offset here
- }else{
- printf("Too large packet in rel_read");
- }
- //TODO: implement for larger input
- free(pkt);
- */
- {
- // read and send as long as:
- // 1. send window is not full, and
- // 2. no EOF has been read from stdin
- while ((get_send_window_size(s) < s->window_size) && !s->send_eof) {
- packet_t *pkt = create_DATA_PKG(s);
- // if stdin is empty, return
- if (pkt == NULL) return;
- // set flag if EOF has been read from stdin
- if (ntohs(pkt->len) == 12) s->send_eof = 1;
- conn_sendpkt(s->c, pkt, ntohs(pkt->len));
- buffer_insert(s->send_buffer, pkt, now_ms());
- free(pkt);
- s->send_nxt++;
- }
- }
- }
- void
- rel_output (rel_t *r)
- {
- //not empty
- if(buffer_size(r->rec_buffer)!=0){
- buffer_node_t* node = buffer_get_first(r->rec_buffer);
- size_t free_b = conn_bufspace(r->c);
- size_t used = 0;
- uint32_t num_b = 0;
- int out_size;
- while(node&&num_b<(r->window_size)){
- if (!buffer_contains(r->rec_buffer, r->next_received) || r->next_received != ntohl(node->packet.seqno)) break;
- uint16_t payload_size = ntohs(node->packet.len) - 12;
- if (used + payload_size > free_b) break;
- if(payload_size==0) r->rec_eof =1;
- out_size = conn_output(r->c,node->packet.data,payload_size);
- assert(out_size>=0);
- used += payload_size;
- r->next_received++;
- node = node->next;
- buffer_remove_first(r->rec_buffer);
- num_b++;
- }
- //ackno
- send_ACK_PKG(r, r->next_received);
- }else{
- if(r->send_eof_acked){
- rel_destroy(r);
- }
- return;
- }
- }
- void
- rel_timer ()
- {
- rel_t *current = rel_list;
- struct timespec now;
- while (current != NULL) {
- for(buffer_node_t* node = buffer_get_first(current->send_buffer);node;node=node->next){
- if(clock_gettime(CLOCK_MONOTONIC,&now)==-1) return;
- if(miliseconds(now.tv_nsec) - (node->last_retransmit)>current->timeout){
- node->last_retransmit = miliseconds(now.tv_nsec);
- conn_sendpkt(current->c,&node->packet,ntohs(node->packet.len));
- }
- }
- current = current->next;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement