Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "util.h"
- #include <pthread.h>
- #include <string.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <dirent.h>
- #include <stdbool.h>
- int dnslookup(const char* hostname, char* firstIPstr, int maxSize){
- /* Local vars */
- struct addrinfo* headresult = NULL;
- struct addrinfo* result = NULL;
- struct sockaddr_in* ipv4sock = NULL;
- struct in_addr* ipv4addr = NULL;
- char ipv4str[INET_ADDRSTRLEN];
- char ipstr[INET6_ADDRSTRLEN];
- int addrError = 0;
- /* DEBUG: Print Hostname*/
- #ifdef UTIL_DEBUG
- fprintf(stderr, "%s\n", hostname);
- #endif
- /* Lookup Hostname */
- addrError = getaddrinfo(hostname, NULL, NULL, &headresult);
- if(addrError){
- fprintf(stderr, "Error looking up Address: %s\n",
- gai_strerror(addrError));
- return UTIL_FAILURE;
- }
- /* Loop Through result Linked List */
- for(result=headresult; result != NULL; result = result->ai_next){
- /* Extract IP Address and Convert to String */
- if(result->ai_addr->sa_family == AF_INET){
- /* IPv4 Address Handling */
- ipv4sock = (struct sockaddr_in*)(result->ai_addr);
- ipv4addr = &(ipv4sock->sin_addr);
- if(!inet_ntop(result->ai_family, ipv4addr,
- ipv4str, sizeof(ipv4str))){
- perror("Error Converting IP to String");
- return UTIL_FAILURE;
- }
- #ifdef UTIL_DEBUG
- fprintf(stdout, "%s\n", ipv4str);
- #endif
- strncpy(ipstr, ipv4str, sizeof(ipstr));
- ipstr[sizeof(ipstr)-1] = '\0';
- }
- else if(result->ai_addr->sa_family == AF_INET6){
- /* IPv6 Handling */
- #ifdef UTIL_DEBUG
- fprintf(stdout, "IPv6 Address: Not Handled\n");
- #endif
- strncpy(ipstr, "UNHANDELED", sizeof(ipstr));
- ipstr[sizeof(ipstr)-1] = '\0';
- }
- else{
- /* Unhandlded Protocol Handling */
- #ifdef UTIL_DEBUG
- fprintf(stdout, "Unknown Protocol: Not Handled\n");
- #endif
- strncpy(ipstr, "UNHANDELED", sizeof(ipstr));
- ipstr[sizeof(ipstr)-1] = '\0';
- }
- /* Save First IP Address */
- if(result==headresult){
- strncpy(firstIPstr, ipstr, maxSize);
- firstIPstr[maxSize-1] = '\0';
- }
- }
- /* Cleanup */
- freeaddrinfo(headresult);
- return UTIL_SUCCESS;
- }
- //Global mutex declaration and initialization
- pthread_mutex_t inputLock;
- pthread_mutex_t writeToBuffer;
- pthread_mutex_t writeCountLock;
- pthread_mutex_t writetoFile;
- pthread_mutex_t chooseFromBuff;
- pthread_mutex_t reqLogblock;
- typedef struct eti {
- //Input stuff
- char inputFiles[10][25];
- int inputfileNum;
- int inputcounter;
- //Reading
- char sharedArray[10][1025];
- int buffCount;
- bool buffOccupied[10];
- //Important stuff
- bool writeFinished;
- bool flushed;
- int readCount;
- //Files
- FILE *ofp;
- FILE *reqLog;
- } EssentialThreadInformation;
- void reqDestruction(int serviced, void *tstruct){
- EssentialThreadInformation *godstruct = (struct EssentialThreadInformation*) tstruct;
- char output[100];
- snprintf(output, 100, "Thread %d serviced %d files\n", pthread_self(), serviced);
- fputs(output, godstruct->reqLog);
- }
- void *requesterThread(void *tstruct){
- EssentialThreadInformation *godstruct = (struct EssentialThreadInformation*) tstruct;
- char data[1025];
- FILE *fd;
- int j;
- int serviced = 0;
- while (godstruct->inputcounter < godstruct->inputfileNum) {
- //Requester choosing its input file
- pthread_mutex_lock(&inputLock);
- if (godstruct->inputcounter >= godstruct->inputfileNum) {
- printf("Spare thread aborted.\n");
- pthread_mutex_unlock(&inputLock);
- pthread_mutex_lock(&reqLogblock);
- reqDestruction(serviced, godstruct);
- pthread_mutex_unlock(&reqLogblock);
- return 0;
- }
- fd = fopen(godstruct->inputFiles[godstruct->inputcounter], "r");
- serviced++;
- godstruct->inputcounter++;
- pthread_mutex_unlock(&inputLock);
- while(fgets(data, 1025, fd)){
- //Do nothing when the buffer is full
- while (godstruct->buffCount == 10) {
- //nothing
- }
- //Only one thread can write to the buffer at the same time
- pthread_mutex_lock(&writeToBuffer);
- if (godstruct->buffCount < 10) {
- for (j = 0; j < 10; j++) {
- if (godstruct->buffOccupied[j] == false) {
- break;
- }
- }
- strcpy(godstruct->sharedArray[j],data);
- godstruct->buffOccupied[j] = true;
- godstruct->buffCount++;
- }
- //Unlock writing
- pthread_mutex_unlock(&writeToBuffer);
- }
- fclose(fd);
- }
- pthread_mutex_lock(&reqLogblock);
- reqDestruction(serviced, godstruct);
- pthread_mutex_unlock(&reqLogblock);
- return 0;
- }
- void *resolverThread(void *tstruct){
- EssentialThreadInformation *godstruct = (struct EssentialThreadInformation*) tstruct;
- char data[1025];
- char ip[1025];
- int j;
- int len;
- while (!godstruct->writeFinished) {
- //Do nothing while there isn't anything in the buffer
- while(godstruct->buffCount == 0) {
- //Do nothing
- if (godstruct->writeFinished) {
- break;
- }
- }
- if (godstruct->writeFinished) {
- break;
- }
- pthread_mutex_lock(&writeCountLock);
- godstruct->readCount++;
- if (godstruct->readCount == 1) {
- pthread_mutex_lock(&writeToBuffer);
- }
- pthread_mutex_unlock(&writeCountLock);
- //Only one thread can choose a buffer location at once
- pthread_mutex_lock(&chooseFromBuff);
- if (godstruct->buffCount > 0) {
- for (j = 0; j < 10; j++) {
- if (godstruct->buffOccupied[j] == true) {
- break;
- }
- }
- godstruct->buffOccupied[j] = false;
- godstruct->buffCount--;
- strcpy(data, godstruct->sharedArray[j]);
- }
- pthread_mutex_unlock(&chooseFromBuff);
- //DNS LOOKUP
- len = strlen(data);
- data[len - 1] = '\0';
- if(dnslookup(data, ip, 1025) == 0) {
- strcat(data, ",");
- strcat(data, ip);
- strcat(data, "\n");
- }
- else {
- strcat(data, ",\n");
- }
- //Only one thread can write to the output file at once.
- pthread_mutex_lock(&writetoFile);
- fputs(data, godstruct->ofp);
- pthread_mutex_unlock(&writetoFile);
- pthread_mutex_lock(&writeCountLock);
- godstruct->readCount--;
- if (godstruct->readCount == 0) {
- pthread_mutex_unlock(&writeToBuffer);
- }
- pthread_mutex_unlock(&writeCountLock);
- }
- //Final flush
- pthread_mutex_lock(&chooseFromBuff);
- if (!godstruct->flushed) {
- for (j = 0; j < 10; j++) {
- if (godstruct->buffOccupied[j] == true) {
- strcpy(data, godstruct->sharedArray[j]);
- len = strlen(data);
- data[len - 1] = '\0';
- if(dnslookup(data, ip, 1025) == 0) {
- strcat(data, ",");
- strcat(data, ip);
- strcat(data, "\n");
- }
- else {
- strcat(data, ",\n");
- }
- fputs(data, godstruct->ofp);
- }
- }
- godstruct->flushed = true;
- }
- pthread_mutex_unlock(&chooseFromBuff);
- return 0;
- }
- int main(int argc, char * argv[]) {
- EssentialThreadInformation *godstruct = malloc(sizeof(EssentialThreadInformation));
- int i = 0;
- //THE GREAT SETTING OF THE VARIABLES
- godstruct->inputfileNum = 0;
- godstruct->inputcounter = 0;
- godstruct->buffCount = 0;
- for (i = 0; i < 10; i++) {
- godstruct->buffOccupied[i] = false;
- }
- godstruct->writeFinished = false;
- godstruct->flushed = false;
- godstruct->readCount = 0;
- godstruct->ofp = fopen(argv[4], "w");
- godstruct->reqLog = fopen(argv[3], "w");
- int numRequesterThreads = atoi(argv[1]);
- int numResolverThreads = atoi(argv[2]);
- //Get input folder directory
- i = 0;
- char fileNames[25];
- DIR *dir;
- struct dirent *dirstruct;
- dir = opendir(argv[5]);
- if (dir) {
- while ((dirstruct = readdir(dir)) != NULL) {
- if(strcmp(dirstruct->d_name, ".") && strcmp(dirstruct->d_name, "..")) {
- strcpy(godstruct->inputFiles[i], argv[5]);
- strcat(godstruct->inputFiles[i], "/");
- strcat(godstruct->inputFiles[i], dirstruct->d_name);
- i++;
- }
- }
- godstruct->inputfileNum = i;
- closedir(dir);
- }
- else {
- printf("Input directory doesn't exist\n");
- return 0;
- }
- //Writers
- pthread_t writethread[10];
- pthread_mutex_init(&inputLock, NULL);
- pthread_mutex_init(&writeToBuffer, NULL);
- pthread_mutex_init(&reqLogblock, NULL);
- //Create threads
- for(i = 0; i < numRequesterThreads; i++) {
- pthread_create(&writethread[i], NULL, requesterThread, (void *)godstruct);
- }
- pthread_mutex_init(&writeCountLock, NULL);
- pthread_mutex_init(&writeToBuffer, NULL);
- pthread_mutex_init(&chooseFromBuff, NULL);
- //Readers
- pthread_t readthread[10];
- //Create threads
- for(i = 0; i < numResolverThreads; i++) {
- pthread_create(&readthread[i], NULL, resolverThread, (void *)godstruct);
- }
- //Wait for threads
- for (i = 0; i < numRequesterThreads; i++) {
- pthread_join(writethread[i], NULL);
- }
- godstruct->writeFinished = true;
- for (i = 0; i < numResolverThreads; i++) {
- pthread_join(readthread[i], NULL);
- }
- fclose(godstruct->ofp);
- fclose(godstruct->reqLog);
- pthread_mutex_destroy(&inputLock);
- pthread_mutex_destroy(&writeToBuffer);
- pthread_mutex_destroy(&writeCountLock);
- pthread_mutex_destroy(&writetoFile);
- pthread_mutex_destroy(&chooseFromBuff);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement