Advertisement
Guest User

Untitled

a guest
Feb 12th, 2016
55
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.97 KB | None | 0 0
  1. #include <sys/wait.h>
  2. #include <stdbool.h>
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <string.h>
  7. #include <time.h>
  8. #include <errno.h>
  9.  
  10. #define BUFFER_SIZE 1024
  11. #define ALPHA_OFFSET 97
  12. #define LETTERS 26
  13.  
  14. const int NUM_OF_MAPPERS = 4;
  15. const int NUM_OF_REDUCERS = 26;
  16.  
  17. const int PIPE_READ_END = 0;
  18. const int PIPE_WRITE_END = 1;
  19. const int PIPE_BUFFER_SIZE = 1000;
  20.  
  21. int mapper_pipes[4][2];
  22. int reducer_pipes[26][2];
  23.  
  24. void pipe_wrapper(int pipefd[]) {
  25. int ret = pipe(pipefd);
  26. if (ret == -1) {
  27. perror("Error. Failed when trying to create pipes.");
  28. exit(EXIT_FAILURE);
  29. }
  30. }
  31.  
  32. void create_mapper_pipes(void) {
  33. int i;
  34. for (i = 0; i < NUM_OF_MAPPERS; i++) {
  35. pipe_wrapper(mapper_pipes[i]);
  36. }
  37. }
  38.  
  39. void create_reducer_pipes(void) {
  40. int i;
  41. for (i=0; i < NUM_OF_REDUCERS; i++) {
  42. pipe_wrapper(reducer_pipes[i]);
  43. }
  44. }
  45.  
  46. // Prints an error msg and exits if one occurs. Else, returns the system call value.
  47. int print_if_err(int syscall_val, const char* syscall_name) {
  48. if (syscall_val < 0) {
  49. perror(syscall_name);
  50. exit(errno);
  51. } else {
  52. //No syscall error we can return
  53. return syscall_val;
  54. }
  55. }
  56.  
  57. void send_chars_to_reducers(char * line) {
  58. printf("SEND_CHARS_TO_REDUCERS read: %snn", line);
  59. int i;
  60. int ob_size = 1;
  61. int wlen = 0;
  62. for (i = 0; i < strlen(line); i++) {
  63. if (line[i] >= ALPHA_OFFSET && line[i] < ALPHA_OFFSET + LETTERS) {
  64. int pipe_num = line[i] - ALPHA_OFFSET;
  65. printf("SENDING %c TO REDUCER PIPE %dn", line[i], pipe_num);
  66. wlen = print_if_err(write(reducer_pipes[pipe_num][PIPE_WRITE_END], &line[i], ob_size), "write");
  67. printf("WROTE %s to REDUCER %dn", line[i], i);
  68. }
  69. }
  70. }
  71.  
  72. void close_reducer_pipes(void) {
  73. int i;
  74. for (i = 0; i < NUM_OF_REDUCERS; i++) {
  75. close(reducer_pipes[i][PIPE_WRITE_END]);
  76. close(reducer_pipes[i][PIPE_READ_END]);
  77. }
  78. }
  79.  
  80. void fork_mappers(void) {
  81.  
  82.  
  83. /* Constants useful to all children */
  84. char ibuf[PIPE_BUFFER_SIZE]; // input pipe buffer
  85. int rlen = 0;
  86.  
  87. int i;
  88. for (i=0; i<NUM_OF_MAPPERS; i++) {
  89. pid_t mapper_pid = print_if_err(fork(), "fork");
  90. if (mapper_pid == 0) {
  91. int j;
  92. for (j=0; j < NUM_OF_MAPPERS; j++) {
  93. close(mapper_pipes[i][PIPE_WRITE_END]);
  94. if (j != i) {
  95. close(mapper_pipes[j][PIPE_READ_END]);
  96. }
  97. }
  98. rlen = print_if_err(read(mapper_pipes[i][PIPE_READ_END], ibuf, 1000), "read");
  99. send_chars_to_reducers(ibuf);
  100. close_reducer_pipes();
  101. //printf("forked mapper%d read: %snn", i, ibuf);
  102. close(mapper_pipes[i][PIPE_READ_END]);
  103. _exit(0);
  104. }
  105. }
  106. }
  107.  
  108. void fork_reducers(void) {
  109. printf("HELLLOOOO FROM REDUCERn");
  110. char ibuf[PIPE_BUFFER_SIZE]; // input pipe buffer
  111. int rlen = 0;
  112. int i;
  113. for (i = 0; i < NUM_OF_REDUCERS; i++) {
  114. pid_t reducer_pid = print_if_err(fork(), "fork");
  115. if (reducer_pid == 0) {
  116. while (1) {
  117. rlen = print_if_err(read(reducer_pipes[i][PIPE_READ_END], ibuf, 1), "read");
  118. if (rlen > 0) {
  119. printf("REDUCER #%d, read %sn", i, ibuf);
  120. }
  121. }
  122. }
  123. }
  124. }
  125.  
  126. void send_lines_to_mappers(void) {
  127. int wlen = 0;
  128. char obuf[PIPE_BUFFER_SIZE];
  129. int ob_size;
  130. int count = 0;
  131.  
  132. char buff[BUFFER_SIZE]; // a buffer for each line of the file
  133. FILE *input_file = fopen("input.txt", "r");
  134. // read the input file line by line
  135. while(fgets(buff, BUFFER_SIZE, input_file) > 0) {
  136. //printf("send_lines_to_mappers read: %snn", buff);
  137. ob_size = sizeof buff;
  138. switch(count) {
  139. case 0 :
  140. write(mapper_pipes[0][PIPE_WRITE_END], buff, ob_size);
  141. close(mapper_pipes[0][PIPE_WRITE_END]);
  142. close(mapper_pipes[0][PIPE_READ_END]);
  143. break;
  144. case 1 :
  145. write(mapper_pipes[1][PIPE_WRITE_END], buff, ob_size);
  146. close(mapper_pipes[1][PIPE_WRITE_END]);
  147. close(mapper_pipes[1][PIPE_READ_END]);
  148. break;
  149. case 2 :
  150. write(mapper_pipes[2][PIPE_WRITE_END], buff, ob_size);
  151. close(mapper_pipes[2][PIPE_WRITE_END]);
  152. close(mapper_pipes[2][PIPE_READ_END]);
  153. break;
  154. case 3 :
  155. write(mapper_pipes[3][PIPE_WRITE_END], buff, ob_size);
  156. close(mapper_pipes[3][PIPE_WRITE_END]);
  157. close(mapper_pipes[3][PIPE_READ_END]);
  158. break;
  159. default :
  160. printf("you did something wrong in send_lines_to_mappers loop");
  161. }
  162. count++;
  163. }
  164. fclose(input_file);
  165. }
  166.  
  167. int main(void) {
  168. // Setup the mapper pipes
  169. create_mapper_pipes();
  170. create_reducer_pipes();
  171. fork_reducers();
  172. fork_mappers();
  173. send_lines_to_mappers();
  174.  
  175. return 0;
  176. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement