Advertisement
Guest User

Untitled

a guest
Dec 7th, 2019
130
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.52 KB | None | 0 0
  1.  
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <omp.h>
  6. #include <unistd.h>
  7. #include <dirent.h>
  8.  
  9. // A linked list (LL) node to store a queue entry
  10. struct QNode {
  11. char* line;//char line[100];
  12. struct QNode* next;
  13. };
  14.  
  15. // The queue, front stores the front node of LL and rear stores the
  16. // last node of LL
  17. struct Queue {
  18. struct QNode *front, *rear;
  19. omp_lock_t lock;
  20. int enqueued;
  21. int dequeued;
  22. };
  23.  
  24. // A utility function to create a new linked list node.
  25. struct QNode* newNode(char* newLine) {
  26. struct QNode* temp = malloc(sizeof(struct QNode));
  27. temp->line = malloc(strlen (newLine) + 1);
  28. strcpy(temp->line, newLine);
  29. temp -> next = NULL;
  30. return temp;
  31. }
  32.  
  33. // A utility function to create an empty queue
  34. struct Queue* createQueue() {
  35. struct Queue* q = malloc(sizeof(struct Queue));
  36. q -> enqueued = q -> dequeued = 0;
  37. omp_init_lock(&q->lock);
  38. q -> front = q -> rear = NULL;
  39. return q;
  40. }
  41.  
  42. //enQueue to add a new line into the queue
  43. void enQueue(struct Queue* q, char newString[]) {
  44. // Create a new LL node
  45. struct QNode* temp = newNode(newString);
  46.  
  47. // If queue is empty, then new node is front and rear both
  48. if (q->rear == NULL) {
  49. q->front = q->rear = temp;
  50. return;
  51. }
  52.  
  53. // Add the new node at the end of queue and change rear
  54. q->rear->next = temp;
  55. q->rear = temp;
  56. }
  57.  
  58. // Function to remove a key from given queue q
  59. struct QNode* deQueue(struct Queue* q) {
  60. // If queue is empty, return NULL.
  61. if (q->front == NULL)
  62. return NULL;
  63.  
  64. // Store previous front and move front one node ahead
  65. struct QNode* temp = q->front;
  66. //free(temp);
  67.  
  68. q->front = q->front->next;
  69.  
  70. // If front becomes NULL, then change rear also as NULL
  71. if (q->front == NULL)
  72. q->rear = NULL;
  73. return temp;
  74. }
  75.  
  76. void tryReceive(struct Queue* queue, int myRank) {
  77.  
  78. int queueSize = queue->enqueued - queue->dequeued;
  79. struct QNode* deQueuedNode;
  80.  
  81. printf("Try Receive: %d\n", myRank);
  82.  
  83. if (queueSize == 0)
  84. return;
  85.  
  86. else if (queueSize == 1) {
  87. omp_set_lock(&queue->lock);
  88. deQueuedNode = deQueue(queue);
  89. omp_unset_lock(&queue->lock);
  90. } else
  91. deQueuedNode = deQueue(queue);
  92. printf("String: %s", deQueuedNode->line);
  93.  
  94. }//tryReceive
  95.  
  96. //function will send all lines to other queues
  97. void sendMessages(struct Queue* arrayOfQueues[], int numThreads, char* fileName, int myRank) {
  98. //open file and read a line at a time.
  99.  
  100. FILE* file = fopen(fileName, "r");
  101.  
  102. if (file == NULL){
  103. fprintf(stderr, "Could not open current directory");
  104. exit(EXIT_FAILURE);
  105. }
  106.  
  107. char lineToSend[500];
  108.  
  109. while (fgets(lineToSend, sizeof(lineToSend), file)) {
  110. printf("%s", lineToSend);
  111. int destinationThread = random() % numThreads;
  112. printf("Destination %d\n", destinationThread);
  113. struct Queue* destinationQueue = arrayOfQueues[destinationThread];
  114. omp_set_lock(&destinationQueue->lock);
  115. enQueue(destinationQueue, lineToSend);
  116. omp_unset_lock(&destinationQueue->lock);
  117. tryReceive(arrayOfQueues[myRank], myRank);
  118. }
  119. }//sendMessages
  120.  
  121.  
  122.  
  123. int main(int argc, char* argv[]){
  124. int done;
  125. int fileCount = 0;
  126. if (argc != 2){
  127. fprintf(stderr, "Incorrect Number Of Arguments. Only Argument Should Be Number of Threads!");
  128. exit(EXIT_FAILURE);
  129. }
  130.  
  131. int numThreads = strtol(argv[1], NULL, 10);
  132. if (numThreads < 0){
  133. fprintf(stderr, "Threads Must Be Greater Than 0!");
  134. exit(EXIT_FAILURE);
  135. }
  136.  
  137. //==================Code to count number of files in folder
  138. struct dirent *de;
  139. DIR *dr = opendir(".");
  140. // opendir returns NULL if couldn't open directory
  141. if (dr == NULL){
  142. fprintf(stderr, "Could not open current directory");
  143. return 0;
  144. }
  145. while ((de = readdir(dr)) != NULL){
  146. fileCount++;
  147. }
  148. closedir(dr);
  149. printf("File Count : %d\n", fileCount);
  150.  
  151. struct dirent *de2;
  152. DIR *dr2 = opendir(".");
  153. if (dr2 == NULL) // opendir returns NULL if couldn't open directory
  154. {
  155. fprintf(stderr, "Could not open current directory");
  156. return 0;
  157. }
  158.  
  159. printf("File Count : %d\n", fileCount);
  160.  
  161. //allocate space for arrayOfQueues
  162. struct Queue** arrayOfQueues = malloc(numThreads * sizeof (struct Queue) );
  163.  
  164. # pragma omp parallel num_threads(numThreads)
  165. {
  166. int myRank = omp_get_thread_num();
  167. arrayOfQueues[myRank] = createQueue();
  168.  
  169. # pragma omp barrier //we need to make sure all threads have message queues before we continue
  170.  
  171. //threads will open all
  172. # pragma omp for schedule(static, 1)
  173. for(int i = 0; i < fileCount; i++){
  174. if ((de2 = readdir(dr2)) != NULL){
  175. char* dot = strrchr(de2->d_name, '.');
  176. if (dot && !strcmp(dot, ".txt")){
  177. printf("Right File Name %s\n", de2->d_name);
  178. sendMessages(arrayOfQueues, numThreads, de2->d_name, myRank);
  179.  
  180. }else{
  181. printf("Wrong File Name %s\n", de2->d_name);
  182. }
  183.  
  184.  
  185. }//if
  186. }//for
  187.  
  188.  
  189. #pragma omp atomic
  190. done++;
  191.  
  192.  
  193. struct QNode* n = deQueue(arrayOfQueues[myRank]);
  194.  
  195.  
  196. if (n != NULL)
  197. printf("Dequeued item is %s %d\n", n->line, myRank);
  198. }//parallel
  199. return 0;
  200. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement