Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- void send_chars_to_reducers(char * line) {
- printf("SEND_CHARS_TO_REDUCERS read: %snn", line);
- int i;
- int ob_size = 1;
- int wlen = 0;
- for (i = 0; i < strlen(line); i++) {
- if (line[i] >= ALPHA_OFFSET && line[i] < ALPHA_OFFSET + LETTERS) {
- int pipe_num = line[i] - ALPHA_OFFSET;
- printf("SENDING %c TO REDUCER PIPE %dn", line[i], pipe_num);
- wlen = print_if_err(write(reducer_pipes[pipe_num][PIPE_WRITE_END], &line[i], ob_size), "write");
- printf("WROTE %s to REDUCER %dn", line[i], i);
- }
- }
- close_reducer_pipes();
- }
- void close_reducer_pipes(void) {
- int i;
- for (i = 0; i < NUM_OF_REDUCERS; i++) {
- close(reducer_pipes[i][PIPE_WRITE_END]);
- close(reducer_pipes[i][PIPE_READ_END]);
- }
- }
- void fork_mappers(void) {
- /* Constants useful to all children */
- char ibuf[PIPE_BUFFER_SIZE]; // input pipe buffer
- int rlen = 0;
- int i;
- for (i=0; i<NUM_OF_MAPPERS; i++) {
- pid_t mapper_pid = print_if_err(fork(), "fork");
- if (mapper_pid == 0) {
- int j;
- for (j=0; j < NUM_OF_MAPPERS; j++) {
- close(mapper_pipes[i][PIPE_WRITE_END]);
- if (j != i) {
- close(mapper_pipes[j][PIPE_READ_END]);
- }
- }
- rlen = print_if_err(read(mapper_pipes[i][PIPE_READ_END], ibuf, 1000), "read");
- send_chars_to_reducers(ibuf);
- close_reducer_pipes();
- //printf("forked mapper%d read: %snn", i, ibuf);
- close(mapper_pipes[i][PIPE_READ_END]);
- _exit(0);
- }
- }
- }
- void fork_reducers(void) {
- printf("HELLLOOOO FROM REDUCERn");
- char ibuf[PIPE_BUFFER_SIZE]; // input pipe buffer
- int rlen = 0;
- int i;
- for (i = 0; i < NUM_OF_REDUCERS; i++) {
- pid_t reducer_pid = print_if_err(fork(), "fork");
- if (reducer_pid == 0) {
- while (1) {
- rlen = print_if_err(read(reducer_pipes[i][PIPE_READ_END], ibuf, 1), "read");
- if (rlen > 0) {
- printf("REDUCER #%d, read %sn", i, ibuf);
- } else {
- break;
- }
- }
- printf("exiting reducern");
- _exit(0);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement