Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <unistd.h>
- #include <ctype.h>
- #include <netdb.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <time.h>
- #include <string.h>
- #include <fcntl.h>
- #include <sys/stat.h>
- #include <poll.h>
- #include "queue.c"
- #include <errno.h>
- #include <arpa/inet.h>
- #include <signal.h>
- #include <openssl/md5.h>
- #define sizeOfArray 1024*1024*5/4
- #define KB112 112*1024
- #define DataProduced 640
- #define MAX_CLIENTS 100
- /* ------------------DEKLARACJE ZMIENNYCH------------------ */
- //Tablice
- char DataBlock[KB112+1]; /* Blok danych wyslany do klienta */
- //do uzupelniania magazynu
- int hex; /* Znak ASCII */
- int char_number = 0; /* Pomocnicza zmienna */
- //do raportow
- int currentClients=0; /* Aktualna liczba polaczonych klientow */
- int sendSize=0; /* Liczba wyslanego materialu */
- int generatedSize=0; /* Liczba wygenerowanego materialu */
- int blockCounter[MAX_CLIENTS]; /* Ilosc blokow przeslanych do danego klienta */
- char *path = NULL; /* Sciezka do pliku */
- //reszta
- Queue *pHead; /* Kolejeczka */
- Queue *ClientsQueue;
- FILE *raport; /* Plik z raportem */
- fd_set readfds;
- struct sockaddr_in address;
- struct timespec monotonic, realtime; /* Znacznik TS */
- //wiadomosc od konsumera
- char msg[6];
- /* ------------------DEKLARACJE FUNKCJI------------------ */
- void ReadingParameters(int argc, char** argv, float* tempo, char** path, unsigned int* port, char** addressip);
- void ProduceAndStore(int hex, Queue *pHead, fd_set *readfds);
- int PortAndAddress(unsigned int *port, char ** addressip, char *opt);
- void Timers(float tempo);
- void CreatingSocket(int *sock, char** addressip, unsigned int *port, int* addrlen);
- void SelectFunction(int *sock, int *max_socket_descriptor, int* socket_descriptor, int* client_socket, int * clients, fd_set* readfds);
- void Handler(int sig, siginfo_t *info, void *context);
- void RaportAboutConnetions(fd_set* readfds);
- void RaportAboutDisconnetions(int socket_descriptor, fd_set* readfds);
- void SendingBlocks(int* socket_descriptor, int* sock, fd_set* readfds);
- MD5_CTX c;
- unsigned char out[MD5_DIGEST_LENGTH];
- /*------------------------------------------------------------------------------*/
- /*------------------------------------------------------------------------------*/
- int main(int argc, char** argv) {
- /* ------------------DEKLARACJE ZMIENNYCH------------------ */
- //do wczytania parametrow
- float tempo; /* Tempo produkcji danych */
- unsigned int port = 0; /* Port */
- char *addressip = "127.0.0.1"; /* Adress */
- //do socketow
- int sock; /* Glowny socket */
- int clients = MAX_CLIENTS; /* Maksymalna liczba clientow */
- int client_socket[MAX_CLIENTS]; /* Tablica z klientami */
- int addrlen;
- int new_sock;
- //do ustawiania deskryptorow
- int max_socket_descriptor;
- int socket_descriptor;
- //do selecta
- int valread; /* Pomocnicze do obslugi polaczenia */
- /* -------------------------------------------------------- */
- if(argc < 4) {
- printf("Brakuje parametrow!\n");
- printf("Prosze podać parametry w ten sposób: -r<sciezka> -t<val> [<addr>]:port\n");
- exit(1);
- }
- bzero(blockCounter, MAX_CLIENTS);
- pHead = createQueue(sizeOfArray); /* 1) a INICJALIZACJA KOLEJKI Z DANYMI */
- ClientsQueue = createQueueClients(MAX_CLIENTS); /* 1) b INICJALIZACJA KOLEJKI Z KLIENTAMI*/
- ReadingParameters(argc, argv, &tempo, &path, &port, &addressip); /* 2) WCZYTANIE PARAMETROW */
- Timers(tempo); /* 3) TWORZENIE ORAZ OBSLUGA TIMEROW */
- for (int i = 0; i < clients; i++) {
- client_socket[i] = 0;
- }
- /* 4) TWORZENIE ORAZ OBSLUGA SOCKETU */
- CreatingSocket(&sock, &addressip, &port, &addrlen);
- raport = fopen(path, "w+");
- if (!raport)
- perror( "Error open file\n");
- while (1) {
- FD_ZERO(&readfds);
- FD_SET(sock, &readfds);
- /* 5) USTAWIANIE DESKRYPTORA ORAZ SELECTA*/
- SelectFunction(&sock, &max_socket_descriptor, &socket_descriptor, client_socket, &clients, &readfds);
- if (FD_ISSET(sock, &readfds)) {
- if ((new_sock = accept(sock, (struct sockaddr *) &address, (socklen_t * ) & addrlen)) < 0) {
- perror("accept error");
- exit(1);
- }
- RaportAboutConnetions(&readfds); /* 7) - a POLACZENIA */
- /* 8) DODANIE DO LISTY KLIENTOW */
- for (int i = 0; i < clients; i++) {
- if (client_socket[i] == 0) {
- client_socket[i] = new_sock;
- break;
- }
- }
- }
- for (int i = 0; i < clients; i++) {
- socket_descriptor = client_socket[i];
- if (FD_ISSET(socket_descriptor, &readfds)) {
- /* 9) a ROZLACZENIE JEZELI VALREAD ROWNY 0 */
- if ((valread = read(socket_descriptor, msg, 6)) == 0) {
- getpeername(socket_descriptor, (struct sockaddr *) &address, (socklen_t * ) & addrlen);
- RaportAboutDisconnetions(socket_descriptor, &readfds); /* 7) - b ROZLACZENIA */
- blockCounter[socket_descriptor]=0;
- close(socket_descriptor);
- client_socket[i] = 0;
- }
- else {
- printf("Socket descriptor: %d\n", socket_descriptor);
- pushClients(ClientsQueue, socket_descriptor);
- printf("Sizeofclients: %d\n", size(ClientsQueue));
- }
- }
- }
- //printf("Sizeofclients: %d\n", size(ClientsQueue));
- if(size(ClientsQueue) > 0)
- {
- SendingBlocks(&socket_descriptor, &sock, &readfds);
- }
- }
- }
- /*------------------------------------------------------------------------------*/
- void ReadingParameters(int argc, char** argv, float* tempo, char** path, unsigned int* port, char** addressip)
- {
- int c;
- char *ptr;
- char *val = NULL;
- int rflag = 0;
- int tflag = 0;
- for (int i = optind; i < argc; i++) {
- if ((c = getopt(argc, argv, "r:t:")) != -1) {
- switch (c) {
- case 'r':
- rflag=1;
- //printf("option -r with value %s\n", optarg);
- *path = optarg;
- break;
- case 't':
- tflag=1;
- //printf("option -t with value %s\n", optarg);
- val = optarg;
- *tempo = strtof(val, &ptr);
- if (*tempo < 1 || *tempo > 8) {
- printf("Tempo powinno być w przedziale od 1 do 8.\n");
- exit(1);
- }
- *tempo = *tempo * 60 / 96/20;
- //printf("%f\n",*tempo);
- break;
- default:
- printf("ERROR: Parametr needs an argument!\n");
- exit(1);
- break;
- }
- }
- }
- if(PortAndAddress(port, addressip, argv[optind])== 0){
- printf("Nie udalo sie utworzyc portu i adresu");
- exit(1);
- }
- if(!(rflag && tflag)){
- printf("Brakuje jednej z flag!\n");
- exit(EXIT_FAILURE);
- }
- }
- /*------------------------------------------------------------------------------*/
- int PortAndAddress(unsigned int *port, char** addressip, char* opt)
- {
- char *tmp = NULL;
- char *pch;
- char colon = ':';
- struct hostent *host;
- struct in_addr **addr_list;
- if((tmp=opt) == 0)
- {
- printf("Nieprawidłowy parametr\n");
- exit(1);
- }
- *port = strtol(tmp, NULL, 0);
- for(int i = 0; i < strlen(tmp); i++)
- {
- if(tmp[i] == colon)
- {
- pch = strtok(tmp, ":");
- *addressip = pch;
- pch = strtok(NULL, ":");
- *port = strtol(pch, NULL, 0);
- }
- }
- if( *port < 1024) {
- printf("Nieprawidłowy port!\n");
- exit(1);
- }
- if (!(host = gethostbyname(*addressip))){
- perror("Gethostbyname error");
- exit(EXIT_FAILURE);
- }
- addr_list = (struct in_addr **) host->h_addr_list;
- int q = 0;
- if( !addr_list[q] ) {
- printf("Podaj poprawny adres!\n");
- exit(1);
- }
- *addressip = inet_ntoa(*addr_list[q]);
- return 1;
- }
- /*------------------------------------------------------------------------------*/
- void ProduceAndStore(int hex, Queue *pHead, fd_set *readfds) {
- char *tmp = (char *)malloc(sizeof(char)*640);
- if(size(pHead)*DataProduced+DataProduced<=pHead->capacity) {
- memset(tmp, hex, 640);
- push(pHead, tmp);
- }
- generatedSize++;
- //free(tmp);
- //printf("%s\n",pop(pHead));
- }
- /*------------------------------------------------------------------------------*/
- void Timers(float tempo) {
- timer_t timer;
- timer_t timer2;
- struct sigaction sa;
- sigemptyset(&sa.sa_mask);
- sigaddset(&sa.sa_mask, SIGINT);
- struct sigevent sev;
- struct sigevent sev2;
- struct itimerspec its;
- struct itimerspec its2;
- sa.sa_flags = SA_SIGINFO;
- sa.sa_sigaction = Handler;
- if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
- perror("SIGRTMIN error");
- }
- if (sigaction(SIGUSR1, &sa, NULL) == -1) {
- perror("SIGUSR1 error");
- }
- sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = SIGRTMIN;
- sev.sigev_value.sival_ptr = &timer;
- sev2.sigev_notify = SIGEV_SIGNAL;
- sev2.sigev_signo = SIGUSR1;
- sev2.sigev_value.sival_ptr = &timer2;
- if (timer_create(CLOCK_REALTIME, &sev, &timer) == -1) {
- printf("create error\n");
- exit(EXIT_FAILURE);
- }
- if (timer_create(CLOCK_REALTIME, &sev2, &timer2) == -1) {
- printf("create error\n");
- exit(EXIT_FAILURE);
- }
- its.it_value.tv_sec = tempo;
- its.it_value.tv_nsec = (tempo-(int)tempo)*1000*(1000000L);
- its.it_interval.tv_sec = its.it_value.tv_sec;
- its.it_interval.tv_nsec = its.it_value.tv_nsec;
- its2.it_value.tv_sec = 5;
- its2.it_value.tv_nsec = 0;
- its2.it_interval.tv_sec = its2.it_value.tv_sec;
- its2.it_interval.tv_nsec = its2.it_value.tv_nsec;
- if (timer_settime(timer, 0, &its, NULL) == -1) {
- printf("settime error\n");
- }
- if (timer_settime(timer2, 0, &its2, NULL) == -1) {
- printf("settime error\n");
- }
- }
- /*------------------------------------------------------------------------------*/
- void CreatingSocket(int *sock, char** addressip, unsigned int *port, int* addrlen)
- {
- int opt = 1;
- if ((*sock = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
- perror("socket failed");
- exit(1);
- }
- if (setsockopt(*sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) < 0) {
- perror("setsockopt");
- exit(1);
- }
- address.sin_family = AF_INET;
- address.sin_addr.s_addr = inet_addr(*addressip);
- address.sin_port = htons(*port);
- if (bind(*sock, (struct sockaddr *) &address, sizeof(address)) < 0) {
- perror("bind failed");
- exit(EXIT_FAILURE);
- }
- if (listen(*sock, 3) < 0) {
- perror("listen");
- exit(1);
- }
- *addrlen = sizeof(address);
- puts("Czekam na polaczenia ...");
- }
- void SelectFunction(int *sock, int *max_socket_descriptor, int* socket_descriptor, int * client_socket, int * clients, fd_set* readfds)
- {
- int activity;
- FD_ZERO(readfds);
- FD_SET(*sock, readfds);
- /* 5) USTAWIANIE DESKRYPTORA */
- *max_socket_descriptor = *sock;
- for (int i = 0; i < *clients; i++) {
- *socket_descriptor = client_socket[i];
- if (*socket_descriptor > 0)
- FD_SET(*socket_descriptor, readfds);
- if (*socket_descriptor > *max_socket_descriptor)
- *max_socket_descriptor = *socket_descriptor;
- }
- activity = select(*max_socket_descriptor + 1, readfds, NULL, NULL, NULL); /* 6) OBSLUGA WIELU POLACZEN JEDNOCZESNIE */
- if ((activity < 0) && (errno != EINTR)) {
- printf("select error");
- }
- }
- /*------------------------------------------------------------------------------*/
- void Handler(int sig, siginfo_t *info, void *context) {
- FD_ZERO(&readfds);
- if (sig == SIGUSR1) {
- if (clock_gettime(CLOCK_MONOTONIC, &monotonic))
- perror("clock monotonic error");
- if (clock_gettime(CLOCK_REALTIME, &realtime))
- perror("clock realtime error");
- float cap = pHead->capacity;
- float busy = size(pHead) * DataProduced / cap;
- raport = fopen(path, "a");
- if (!raport)
- perror( "Error open file\n");
- fprintf(raport, "\n------------------------RAPORT------------------------\n");
- fprintf(raport, "Podlaczonych uzytkownikow: %d\n", currentClients);
- fprintf(raport, "Zajetośc magazynu: %d/%d (%f)\n", size(pHead) * DataProduced, (int) cap, busy * 100);
- fprintf(raport, "Przepływ materialu w ciagu ostatnich 5 sekund: %d\n",(generatedSize*DataProduced)-sendSize );
- fprintf(raport, "TS: Monotonic: %lds %ldns\tRealTime: %lds %ldns\n", monotonic.tv_sec, monotonic.tv_nsec,
- realtime.tv_sec, realtime.tv_nsec);
- fprintf(raport, "------------------------------------------------------\n");
- fflush(raport);
- fclose(raport);
- generatedSize=0;
- sendSize=0;
- }
- if (sig == SIGRTMIN) {
- if (char_number > 25)
- hex = 65 - 26 + char_number;
- else
- hex = 97 + char_number;
- ProduceAndStore(hex, pHead, &readfds);
- char_number++;
- if (char_number > 51)
- char_number = 0;
- }
- }
- /*------------------------------------------------------------------------------*/
- void RaportAboutConnetions(fd_set* readfds)
- {
- FD_ZERO(readfds);
- //printf("RaportAboutConetion\n");
- if (clock_gettime(CLOCK_MONOTONIC, &monotonic))
- perror("clock monotonic error");
- if (clock_gettime(CLOCK_REALTIME, &realtime))
- perror("clock realtime error");
- raport = fopen(path, "a");
- if (!raport)
- perror( "Error open file\n");
- fprintf(raport, "\n----------------------POLACZENIA----------------------\n");
- fprintf(raport, "Polaczono z nowym klientem o adresie %s!\n", inet_ntoa(address.sin_addr));
- fprintf(raport, "TS: Monotonic: %lds %ldns\tRealTime: %lds %ldns\n", monotonic.tv_sec, monotonic.tv_nsec,
- realtime.tv_sec, realtime.tv_nsec);
- fprintf(raport, "------------------------------------------------------\n");
- fflush(raport);
- fclose(raport);
- currentClients++;
- }
- /*------------------------------------------------------------------------------*/
- void RaportAboutDisconnetions(int socket_descriptor, fd_set* readfds)
- {
- FD_ZERO(readfds);
- //printf("RaportAboutDisconetion\n");
- if (clock_gettime(CLOCK_MONOTONIC, &monotonic))
- perror("clock monotonic error");
- if (clock_gettime(CLOCK_REALTIME, &realtime))
- perror("clock realtime error");
- raport = fopen(path, "a");
- if (!raport)
- perror( "Error open file\n");
- fprintf(raport, "\n----------------------ROZLACZENIA---------------------\n");
- fprintf(raport, "Rozlaczono z klientem o adresie %s!\n", inet_ntoa(address.sin_addr));
- fprintf(raport, "Ilosc wyslanych blokow z danymi: %d\n", blockCounter[socket_descriptor]);
- fprintf(raport, "TS: Monotonic: %lds %ldns\tRealTime: %lds %ldns\n", monotonic.tv_sec,
- monotonic.tv_nsec, realtime.tv_sec, realtime.tv_nsec);
- fprintf(raport, "-------------------------------------------------------\n");
- fflush(raport);
- fclose(raport);
- currentClients--;
- }
- /*------------------------------------------------------------------------------*/
- void SendingBlocks(int* socket_descriptor, int *sock, fd_set* readfds)
- {
- FD_ZERO(readfds);
- if (size(pHead) * DataProduced > KB112 + 640) {
- char *magazyn[KB112/DataProduced+1];
- bzero(magazyn, KB112/DataProduced+1);
- bzero(DataBlock, KB112 + 1);
- for (int j = 0; j < KB112 / DataProduced; j++) {
- magazyn[j] = pop(pHead);
- strcat(DataBlock, magazyn[j]);
- //zeby wyslac 128 brakujacych bajtow
- if(strlen(DataBlock)==114560){
- char *tmp1 = (char *)malloc(sizeof(char) * 640);
- memset(tmp1, '0', strlen(tmp1));
- tmp1 = pop(pHead);
- char *tmp2 = (char *)malloc(sizeof(char) * 128);
- memcpy(tmp2, tmp1, 128);
- strcat(DataBlock, tmp2);
- free(tmp1);
- free(tmp2);
- }
- }
- int whichClient = popClients(ClientsQueue);
- //printf("%s\n",DataBlock);
- printf("whichClient %d\n",whichClient);
- printf("Sizeofclients: %d\n", size(ClientsQueue));
- if (send(whichClient, DataBlock, strlen(DataBlock), 0) < 0) {
- perror("send");
- close(*sock);
- exit(1);
- }
- MD5_Init(&c);
- MD5_Update(&c, DataBlock, 114688);
- MD5_Final(out, &c);
- printf("%ld\n",strlen(DataBlock));
- for(int i = 0; i < MD5_DIGEST_LENGTH; i++)
- printf("%02x", out[i]);
- printf("\n");
- sendSize = sendSize + KB112;
- blockCounter[whichClient]++;
- free(*magazyn);
- }
- }
- /*------------------------------------------------------------------------------*/
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement