Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef _GNU_SOURCE
- # define _GNU_SOURCE
- #endif
- #include <stdlib.h>
- #include <stdio.h>
- #include <errno.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <unistd.h>
- #include <getopt.h>
- #include <string.h>
- #include <pthread.h>
- #include <sys/mman.h>
- #ifdef STATS
- #include <sys/time.h>
- #endif
- #define MIN_LINES_FOR_THREADS ((size_t) 5000)
- #define FIND_PIVOTS_ELEMS ((size_t) 1000)
- /* Print progress messages */
- static int verbose = 0;
- /* Number of threads to use */
- static size_t threads = 0;
- /* Strings are \0-terminated */
- static int zero_terminated = 0;
- /* Read the file line by line with getdelim (e.g. for reading on stdin) */
- #define READ_BY_LINE 0
- /* Copy the file into a malloc'd buffer */
- #define READ_BY_BLOCK 1
- /* mmap the file */
- #define READ_BY_MMAPED_BLOCK 2
- /* Buffer or mapping for READ_BY_BLOCK and READ_BY_MMAPED_BLOCK */
- static void *read_buf;
- /* Size of read_buf */
- static size_t read_buf_length;
- /* Set to 1 if the lines are malloc'd and don't point to read_buf */
- static int lines_are_copied = 0;
- /* The address of the last line if it was malloc'd and lines_are_copied == 0 */
- static char* last_line = NULL;
- #ifdef STATS
- /* A comparison counter per thread */
- static __thread unsigned long cmps = 0;
- /* Some timer manipulation macros */
- # define INIT_TIMER(t) ({\
- struct itimerval *__t = (t); \
- __t->it_value.tv_sec = __INT_MAX__; \
- __t->it_value.tv_usec = 0; \
- __t->it_interval.tv_sec = 0; \
- __t->it_interval.tv_usec = 0; \
- setitimer(ITIMER_REAL, __t, NULL); \
- })
- # define GET_T(t) ({\
- struct itimerval *__t = (t); \
- getitimer(ITIMER_REAL, __t); \
- })
- # define DIFF_T0_T1(t0,t1,diff) ({\
- struct itimerval *__t0 = (t0); \
- struct itimerval *__t1 = (t1); \
- unsigned long __diff; \
- __diff = (__t0->it_value.tv_sec - __t1->it_value.tv_sec) * 1000000L; \
- __diff += (__t0->it_value.tv_usec - __t1->it_value.tv_usec); \
- diff = __diff;\
- })
- # define RESET_TIMER(t) ({\
- struct itimerval *__t = (t); \
- __t->it_value.tv_sec = 0; \
- __t->it_value.tv_usec = 0; \
- __t->it_interval.tv_sec = 0; \
- __t->it_interval.tv_usec = 0; \
- setitimer(ITIMER_REAL, __t, NULL); \
- })
- #endif /* STATS */
- /* The string comparison function */
- int (*str_compare)(const char *, const char *) = strcmp;
- /* Parameters for thread_sort */
- struct thread_params {
- char **start; // start of the buffer to sort
- size_t len; // length of the buffer
- #ifdef STATS
- unsigned long cmps;
- #endif
- };
- /* A working thread and its parameters */
- struct thread_data {
- struct thread_params params;
- pthread_t thread;
- };
- struct pivot_data {
- size_t i; // index of the line
- char *s; // line
- };
- /* Read the file line by line and copy the lines into a char ** array */
- static char ** read_by_lines(FILE *in, size_t *length) {
- char **lines, **tmp_lines;
- size_t lines_len = 0;
- size_t nlines = 0;
- char *buf = NULL, *buf_cpy;
- size_t buflen = 0;
- size_t i;
- ssize_t r;
- int delim;
- // all the lines are malloc'd
- lines_are_copied = 1;
- lines = NULL;
- if (zero_terminated)
- delim = '\0';
- else
- delim = '\n';
- while (1) {
- r = getdelim(&buf, &buflen, delim, in);
- /* r doesn't count the terminating \0 but the buffer is always
- * 0-terminated (buf[r] == 0) */
- if (r >= 0) {
- // replace the delimiter by \0
- if (r >= 1 && buf[r - 1] == delim) {
- buf[r - 1] = '\0';
- r--;
- }
- if (r == 0) {
- continue;
- }
- if (lines_len == nlines) {
- // expand the lines buffer
- lines_len += 1 << 13;
- tmp_lines = realloc(lines, lines_len * sizeof(char *));
- if (tmp_lines == NULL) {
- perror("realloc");
- for (i = 0; i < nlines; i++) {
- free(lines[i]);
- }
- free(lines);
- free(buf);
- return NULL;
- }
- lines = tmp_lines;
- }
- // copy the buffer from getline
- buf_cpy = malloc(r + 1);
- if (buf_cpy == NULL) {
- perror("malloc");
- for (i = 0; i < nlines; i++) {
- free(lines[i]);
- }
- free(lines);
- free(buf);
- return NULL;
- }
- memcpy(buf_cpy, buf, r + 1);
- lines[nlines] = buf_cpy;
- nlines++;
- }
- else {
- break;
- }
- }
- *length = nlines;
- free(buf);
- return lines;
- }
- /* Read the file from the buffer pmap and build a char ** array with the lines.
- * is_ro: the buffer is read only (mmap)
- *
- * If the buffer is read only and the delimiter is \n, the lines are copied and
- * lines_are_copied is set. Otherwise the lines point to pmap and the delimiters
- * are replaced by \0.
- *
- * When the lines are not copied, the last line of the file may need to be
- * allocated regardless to add a trailing 0 byte. In this case last_line is set
- * to this buffer.
- */
- static char ** read_block(void *pmap, size_t file_length, size_t *length,
- int is_ro) {
- char **lines, **tmp_lines;
- size_t lines_len = 0;
- size_t nlines = 0;
- size_t i, r;
- char *buf;
- char delim;
- char *b = pmap, *e = pmap;
- char *map_end = ((char *) pmap) + file_length - 1;
- if (zero_terminated || !is_ro)
- lines_are_copied = 0;
- else
- lines_are_copied = 1;
- lines = NULL;
- if (zero_terminated)
- delim = '\0';
- else
- delim = '\n';
- while (e <= map_end) {
- if (*e == delim || e == map_end) {
- // line length from b to e included
- r = e - b + 1;
- if (r == 1) {
- b = e + 1;
- continue;
- }
- if (lines_len == nlines) {
- // expand the buffer
- lines_len += 1 << 13;
- tmp_lines = realloc(lines, lines_len * sizeof(char *));
- if (tmp_lines == NULL) {
- perror("realloc");
- if (lines_are_copied)
- for (i = 0; i < nlines; i++) {
- free(lines[i]);
- }
- free(lines);
- return NULL;
- }
- lines = tmp_lines;
- }
- if (lines_are_copied) {
- // missing delimiter at the end of the file
- if (*e != delim)
- r++;
- buf = malloc(r);
- if (buf == NULL) {
- perror("malloc");
- for (i = 0; i < nlines; i++) {
- free(lines[i]);
- }
- free(lines);
- return NULL;
- }
- memcpy(buf, b, r - 1);
- buf[r - 1] = '\0';
- lines[nlines] = buf;
- }
- else { // zero_terminated || !is_ro
- if (e == map_end && *e != delim) {
- buf = malloc(r + 1);
- if (buf == NULL) {
- perror("malloc");
- free(lines);
- return NULL;
- }
- memcpy(buf, b, r);
- buf[r] = '\0';
- lines[nlines] = buf;
- last_line = buf;
- }
- else if (zero_terminated) {
- lines[nlines] = b;
- }
- else {
- *e = '\0';
- lines[nlines] = b;
- }
- }
- nlines++;
- b = e + 1;
- }
- e++;
- }
- *length = nlines;
- return lines;
- }
- /* Read the file with the given read method */
- static char ** read_file(FILE *in, size_t *length, int read_method) {
- int fd;
- struct stat statbuf;
- int i;
- size_t r;
- if (read_method == READ_BY_LINE) {
- return read_by_lines(in, length);
- }
- else {
- fd = fileno(in);
- i = fstat(fd, &statbuf);
- if (i == -1) {
- perror("fstat");
- return NULL;
- }
- read_buf_length = statbuf.st_size;
- if (read_method == READ_BY_BLOCK) {
- read_buf = malloc(read_buf_length);
- if (read_buf == NULL) {
- perror("malloc");
- return NULL;
- }
- r = fread(read_buf, 1, read_buf_length, in);
- if (r != read_buf_length) {
- perror("fread");
- free(read_buf);
- return NULL;
- }
- return read_block(read_buf, read_buf_length, length, 0);
- }
- else if (read_method == READ_BY_MMAPED_BLOCK) {
- read_buf = mmap(NULL, read_buf_length, PROT_READ, MAP_PRIVATE
- | MAP_POPULATE, fd, 0);
- if (read_buf == MAP_FAILED) {
- perror("mmap");
- return NULL;
- }
- return read_block(read_buf, read_buf_length, length, 1);
- }
- }
- return NULL;
- }
- /* Free a lines buffer and the resources used by the read method */
- static void free_read(char ** lines, size_t length, int read_method) {
- size_t i;
- if (lines_are_copied) {
- for (i = 0; i < length; i++)
- free(lines[i]);
- }
- else if (last_line != NULL)
- free(last_line);
- free(lines);
- if (read_method == READ_BY_BLOCK)
- free(read_buf);
- else if (read_method == READ_BY_MMAPED_BLOCK)
- munmap(read_buf, read_buf_length);
- }
- /* Write the line buffer into 'out' */
- static int write_lines(FILE *out, char **lines, size_t len) {
- size_t i;
- int r;
- char delim;
- if (zero_terminated)
- delim = '\0';
- else
- delim = '\n';
- for (i = 0; i < len; i++) {
- r = fprintf(out, "%s%c", lines[i], delim);
- if (r < 0) {
- perror("fprintf");
- return -1;
- }
- }
- return 0;
- }
- /* Basic comparison with str_compare */
- static int cmp_lines(const void *p1, const void *p2) {
- #ifdef STATS
- cmps++;
- #endif
- return str_compare(*(char * const *) p1, *(char * const *) p2);
- }
- /* A worker thread sorting a part of the line buffer */
- static void * thread_sort(void *args) {
- struct thread_params * params = (struct thread_params *) args;
- qsort(params->start, params->len, sizeof(char *), cmp_lines);
- #ifdef STATS
- params->cmps = cmps;
- #endif
- return NULL;
- }
- /* This comparison function operates over struct pivot_data elements */
- static int cmp_pivot(const void *p1, const void *p2) {
- const struct pivot_data *d1 = (const struct pivot_data *) p1;
- const struct pivot_data *d2 = (const struct pivot_data *) p2;
- #ifdef STATS
- cmps++;
- #endif
- return str_compare(d1->s, d2->s);
- }
- /* This function sort the FIND_PIVOTS_ELEMS first elements of lines in a
- * temporary array and take n regularly spaced values that can be used as pivots
- * to build a partition.
- */
- static int find_pivots(char **lines, size_t *indexes, size_t n_pivots) {
- size_t i, s, n;
- struct pivot_data *tmp = malloc(FIND_PIVOTS_ELEMS
- * sizeof(struct pivot_data));
- if (tmp == NULL) {
- perror("malloc");
- return -1;
- }
- for (i = 0; i < FIND_PIVOTS_ELEMS; i++) {
- tmp[i].i = i;
- tmp[i].s = lines[i];
- }
- qsort(tmp, FIND_PIVOTS_ELEMS, sizeof(struct pivot_data), cmp_pivot);
- s = FIND_PIVOTS_ELEMS / (n_pivots + 1);
- n = 0;
- for (i = 0; i < n_pivots; i++) {
- n += s;
- indexes[i] = tmp[n].i;
- }
- free(tmp);
- return 0;
- }
- /* Adapted quicksort's partition function.
- * The function works on lines[offset:n-offset-1]
- * The pivot is pivots[pivots_i]
- * The next pivots' indexes in pivots[pivots_i+1:pivots_n-1] are updated
- */
- static size_t partition(char **lines, size_t *pivots, size_t pivots_i,
- size_t pivots_n, size_t n, size_t offset) {
- size_t i, j, k;
- size_t p;
- char *tmp;
- lines += offset;
- n -= offset;
- i = 0;
- j = n - 1;
- p = pivots[pivots_i] - offset;
- tmp = lines[j];
- lines[j] = lines[p];
- lines[p] = tmp;
- while (1) {
- #ifdef STATS
- while (cmps++, str_compare(lines[i], lines[n - 1]) < 0)
- i++;
- while (cmps++, str_compare(lines[j], lines[n - 1]) >= 0)
- j--;
- #else
- while (str_compare(lines[i], lines[n - 1]) < 0)
- i++;
- while (str_compare(lines[j], lines[n - 1]) >= 0)
- j--;
- #endif
- if (i > j)
- break;
- tmp = lines[j];
- lines[j] = lines[i];
- lines[i] = tmp;
- // update the indexes of the next pivots
- for (k = pivots_i + 1; k < pivots_n; k++) {
- if (pivots[k] - offset == i) {
- pivots[k] = j + offset;
- break;
- }
- }
- }
- tmp = lines[i];
- lines[i] = lines[n - 1];
- lines[n - 1] = tmp;
- return i + offset;
- }
- static int do_sort(char **lines, size_t lines_length) {
- struct thread_data *th_data = NULL;
- size_t offset, i;
- int r;
- if (!threads || threads == 1) {
- qsort(lines, lines_length, sizeof(char *), cmp_lines);
- }
- else {
- th_data = malloc(threads * sizeof(struct thread_data));
- if (th_data == NULL) {
- perror("malloc");
- return -1;
- }
- size_t * pivots = malloc((threads - 1) * sizeof(size_t));
- if (pivots == NULL) {
- perror("malloc");
- free(th_data);
- return -1;
- }
- /* Find (threads - 1) pivots */
- if (find_pivots(lines, pivots, threads - 1) != 0) {
- free(th_data);
- free(pivots);
- return -1;
- }
- /* Build a first partition with the pivots
- * ......p1...p2.... ....pn-1.......
- */
- for (i = 0; i < threads - 1; i++) {
- if (i == 0) {
- offset = 0;
- }
- else {
- offset = pivots[i - 1];
- }
- pivots[i] = partition(lines, pivots, i, threads - 1, lines_length,
- offset);
- }
- /* Start a thread on each sub part */
- for (i = 0; i < threads; i++) {
- if (i == 0) {
- th_data[i].params.start = &(lines[0]);
- th_data[i].params.len = pivots[0];
- }
- else if (i == threads - 1) {
- th_data[i].params.start = &(lines[pivots[threads - 2]]);
- th_data[i].params.len = lines_length - pivots[threads - 2];
- }
- else {
- th_data[i].params.start = &(lines[pivots[i - 1]]);
- th_data[i].params.len = pivots[i] - pivots[i - 1];
- }
- r = pthread_create(&th_data[i].thread, NULL, thread_sort,
- &th_data[i].params);
- if (r > 0)
- abort();
- }
- /* Wait for completion */
- for (i = 0; i < threads; i++) {
- pthread_join(th_data[i].thread, NULL);
- #ifdef STATS
- cmps += th_data[i].params.cmps;
- #endif
- }
- free(th_data);
- free(pivots);
- }
- return 0;
- }
- static void usage(void) {
- fprintf(stderr, "Usage: sortfile [OPTION]... input_file [output_file]\n\n");
- fprintf(stderr, "Options:\n");
- fprintf(stderr, " -c, --coll "
- "use 'strcoll' to order the lines\n");
- fprintf(stderr, " -h, --help "
- "this help message\n");
- fprintf(stderr, " -m, --mmap "
- "Use mmap to read the input file (if not stdin)\n");
- fprintf(stderr, " -t, --threaded=N "
- "number of threads to use\n");
- fprintf(stderr, " -v, --verbose "
- "print progress messages to stderr\n");
- fprintf(stderr, " -V, --vers "
- "use 'strverscmp' to order the lines\n");
- fprintf(stderr, " -z, --zero "
- "end lines with a 0 byte, not with a newline\n");
- fprintf(stderr, "\n");
- fprintf(stderr, "Use \"-\" instead of the input or output file to use "
- "respectively stdin or stdout. If the output file is omitted, the "
- "output is written to stdout.\n");
- fprintf(stderr, "The default comparison function is 'strcmp'.\n");
- fprintf(stderr, "Only when using -m on a file with 0-terminated lines, the"
- " data are not copied into the process memory. Otherwise the file "
- "has to be entirely buffered.\n");
- }
- int main(int argc, char *argv[]) {
- int exit_status = EXIT_FAILURE;
- int opt;
- int r;
- const char *in_filename, *out_filename;
- FILE *in_file, *out_file;
- #ifdef STATS
- struct itimerval t0, t1;
- unsigned long int diff_read = 0;
- unsigned long int diff_sort = 0;
- unsigned long int diff_write = 0;
- #endif
- long tmp_long;
- char ** lines;
- size_t lines_length = 0;
- int read_method = READ_BY_BLOCK;
- struct option long_options[] = { { "help", 0, NULL, 'h' }, { "verbose", 0,
- NULL, 'v' }, { "threads", 1, NULL, 't' }, { "coll", 0, NULL, 'c' },
- { "vers", 0, NULL, 'V' }, { "mmap", 0, NULL, 'm' }, { "zero", 0,
- NULL, 'z' }, { 0, 0, 0, 0 } };
- while ((opt = getopt_long(argc, argv, "hcVvt:mz", long_options, NULL)) >= 0) {
- switch (opt) {
- case 'v':
- verbose = 1;
- break;
- case 't':
- errno = 0;
- tmp_long = strtol(optarg, (char **) NULL, 10);
- if (errno != 0) {
- perror("strtol");
- exit(EXIT_FAILURE);
- }
- if (tmp_long < 1) {
- fprintf(stderr, "Number of threads must be >= 1\n");
- exit(EXIT_FAILURE);
- }
- threads = tmp_long;
- break;
- case 'c':
- str_compare = strcoll;
- break;
- case 'V':
- str_compare = strverscmp;
- break;
- case 'm':
- read_method = READ_BY_MMAPED_BLOCK;
- break;
- case 'z':
- zero_terminated = 1;
- break;
- case 'h':
- usage();
- exit(EXIT_SUCCESS);
- break;
- default:
- usage();
- exit(EXIT_FAILURE);
- break;
- }
- }
- argv += optind;
- argc -= optind;
- if (argc != 1 && argc != 2) {
- usage();
- exit(EXIT_FAILURE);
- }
- in_filename = argv[0];
- if (argc == 2)
- out_filename = argv[1];
- else
- out_filename = "-";
- /* Open the input and output files */
- if (strcmp(in_filename, "-") != 0) {
- in_file = fopen(in_filename, "r");
- if (in_file == NULL) {
- perror("fopen");
- exit(EXIT_FAILURE);
- }
- }
- else {
- in_file = stdin;
- in_filename = "stdin";
- read_method = READ_BY_LINE;
- }
- if (strcmp(out_filename, "-") != 0) {
- out_file = fopen(out_filename, "w");
- if (out_file == NULL) {
- perror("fopen");
- exit(EXIT_FAILURE);
- }
- }
- else {
- out_file = stdout;
- out_filename = "stdout";
- }
- /* Read input */
- if (verbose)
- fprintf(stderr, "Reading from '%s'...\n", in_filename);
- #ifdef STATS
- INIT_TIMER(&t0);
- GET_T(&t0);
- #endif
- lines = read_file(in_file, &lines_length, read_method);
- #ifdef STATS
- GET_T(&t1);
- DIFF_T0_T1(&t0,&t1,diff_read);
- #endif
- if (lines == NULL) {
- goto close_end;
- }
- if (verbose) {
- fprintf(stderr, " Read %zd lines.\n", lines_length);
- }
- /* The WTF cases... */
- if (lines_length < threads)
- threads = lines_length;
- if (lines_length < FIND_PIVOTS_ELEMS)
- threads = 0;
- if (lines_length < MIN_LINES_FOR_THREADS)
- threads = 0;
- /* Sorting */
- if (verbose) {
- if (!threads || threads == 1)
- fprintf(stderr, "Sorting...\n");
- else
- fprintf(stderr, "Sorting with %zd threads...\n", threads);
- }
- #ifdef STATS
- INIT_TIMER(&t0);
- GET_T(&t0);
- #endif
- r = do_sort(lines, lines_length);
- #ifdef STATS
- GET_T(&t1);
- DIFF_T0_T1(&t0,&t1,diff_sort);
- #endif
- if (r == -1)
- goto free_end;
- /* Write output */
- if (verbose)
- fprintf(stderr, "Writting output to '%s'...\n", out_filename);
- #ifdef STATS
- INIT_TIMER(&t0);
- GET_T(&t0);
- #endif
- r = write_lines(out_file, lines, lines_length);
- #ifdef STATS
- GET_T(&t1);
- DIFF_T0_T1(&t0,&t1,diff_write);
- #endif
- if (r < 0) {
- goto free_end;
- }
- exit_status = EXIT_SUCCESS;
- #ifdef STATS
- RESET_TIMER(&t0);
- fprintf(stderr, "Stats:\n");
- fprintf(stderr, " comparisons: %zd\n", cmps);
- fprintf(stderr, " read time: %.03fs\n", diff_read / 1000000.);
- fprintf(stderr, " sort time: %.03fs\n", diff_sort / 1000000.);
- fprintf(stderr, " write time: %.03fs\n", diff_write / 1000000.);
- #endif
- free_end:
- if (verbose)
- fprintf(stderr, "Cleaning...\n");
- free_read(lines, lines_length, read_method);
- close_end:
- fclose(in_file);
- fclose(out_file);
- return exit_status;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement