Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <omp.h>
- #include <unistd.h>
- #include <dirent.h>
- // A linked list (LL) node to store a queue entry
- struct QNode {
- char* line;//char line[100];
- struct QNode* next;
- };
- // The queue, front stores the front node of LL and rear stores the
- // last node of LL
- struct Queue {
- struct QNode *front, *rear;
- omp_lock_t lock;
- int enqueued;
- int dequeued;
- };
- // A utility function to create a new linked list node.
- struct QNode* newNode(char* newLine) {
- struct QNode* temp = malloc(sizeof(struct QNode));
- temp->line = malloc(strlen (newLine) + 1);
- strcpy(temp->line, newLine);
- temp -> next = NULL;
- return temp;
- }
- // A utility function to create an empty queue
- struct Queue* createQueue() {
- struct Queue* q = malloc(sizeof(struct Queue));
- q -> enqueued = q -> dequeued = 0;
- omp_init_lock(&q->lock);
- q -> front = q -> rear = NULL;
- return q;
- }
- //enQueue to add a new line into the queue
- void enQueue(struct Queue* q, char newString[]) {
- // Create a new LL node
- struct QNode* temp = newNode(newString);
- // If queue is empty, then new node is front and rear both
- if (q->rear == NULL) {
- q->front = q->rear = temp;
- return;
- }
- // Add the new node at the end of queue and change rear
- q->rear->next = temp;
- q->rear = temp;
- }
- // Function to remove a key from given queue q
- struct QNode* deQueue(struct Queue* q) {
- // If queue is empty, return NULL.
- if (q->front == NULL)
- return NULL;
- // Store previous front and move front one node ahead
- struct QNode* temp = q->front;
- //free(temp);
- q->front = q->front->next;
- // If front becomes NULL, then change rear also as NULL
- if (q->front == NULL)
- q->rear = NULL;
- return temp;
- }
- void tryReceive(struct Queue* queue, int myRank) {
- int queueSize = queue->enqueued - queue->dequeued;
- struct QNode* deQueuedNode;
- printf("Try Receive: %d\n", myRank);
- if (queueSize == 0)
- return;
- else if (queueSize == 1) {
- omp_set_lock(&queue->lock);
- deQueuedNode = deQueue(queue);
- omp_unset_lock(&queue->lock);
- } else
- deQueuedNode = deQueue(queue);
- printf("String: %s", deQueuedNode->line);
- }//tryReceive
- //function will send all lines to other queues
- void sendMessages(struct Queue* arrayOfQueues[], int numThreads, char* fileName, int myRank) {
- //open file and read a line at a time.
- FILE* file = fopen(fileName, "r");
- if (file == NULL){
- fprintf(stderr, "Could not open current directory");
- exit(EXIT_FAILURE);
- }
- char lineToSend[500];
- while (fgets(lineToSend, sizeof(lineToSend), file)) {
- printf("%s", lineToSend);
- int destinationThread = random() % numThreads;
- printf("Destination %d\n", destinationThread);
- struct Queue* destinationQueue = arrayOfQueues[destinationThread];
- omp_set_lock(&destinationQueue->lock);
- enQueue(destinationQueue, lineToSend);
- omp_unset_lock(&destinationQueue->lock);
- tryReceive(arrayOfQueues[myRank], myRank);
- }
- }//sendMessages
- int main(int argc, char* argv[]){
- int done;
- int fileCount = 0;
- if (argc != 2){
- fprintf(stderr, "Incorrect Number Of Arguments. Only Argument Should Be Number of Threads!");
- exit(EXIT_FAILURE);
- }
- int numThreads = strtol(argv[1], NULL, 10);
- if (numThreads < 0){
- fprintf(stderr, "Threads Must Be Greater Than 0!");
- exit(EXIT_FAILURE);
- }
- //==================Code to count number of files in folder
- struct dirent *de;
- DIR *dr = opendir(".");
- // opendir returns NULL if couldn't open directory
- if (dr == NULL){
- fprintf(stderr, "Could not open current directory");
- return 0;
- }
- while ((de = readdir(dr)) != NULL){
- fileCount++;
- }
- closedir(dr);
- printf("File Count : %d\n", fileCount);
- struct dirent *de2;
- DIR *dr2 = opendir(".");
- if (dr2 == NULL) // opendir returns NULL if couldn't open directory
- {
- fprintf(stderr, "Could not open current directory");
- return 0;
- }
- printf("File Count : %d\n", fileCount);
- //allocate space for arrayOfQueues
- struct Queue** arrayOfQueues = malloc(numThreads * sizeof (struct Queue) );
- # pragma omp parallel num_threads(numThreads)
- {
- int myRank = omp_get_thread_num();
- arrayOfQueues[myRank] = createQueue();
- # pragma omp barrier //we need to make sure all threads have message queues before we continue
- //threads will open all
- # pragma omp for schedule(static, 1)
- for(int i = 0; i < fileCount; i++){
- if ((de2 = readdir(dr2)) != NULL){
- char* dot = strrchr(de2->d_name, '.');
- if (dot && !strcmp(dot, ".txt")){
- printf("Right File Name %s\n", de2->d_name);
- sendMessages(arrayOfQueues, numThreads, de2->d_name, myRank);
- }else{
- printf("Wrong File Name %s\n", de2->d_name);
- }
- }//if
- }//for
- #pragma omp atomic
- done++;
- struct QNode* n = deQueue(arrayOfQueues[myRank]);
- if (n != NULL)
- printf("Dequeued item is %s %d\n", n->line, myRank);
- }//parallel
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement