Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdio.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <pthread.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <sys/list.h>
- #include <sys/avl.h>
- #include <string.h>
- #include <math.h>
- #include <stddef.h>
- #include <jansson.h>
- #define MAXNS 6
- #define MAXNSLEN 16
- #define MAXPATHLEN 1024
- #define UUIDLEN 64
- #define RTYPE_OBJECT 0
- #define RTYPE_DIRECTORY 1
- #define MIN_SIZE 131072
- #define MAX(a,b) (((a) > (b)) ? (a) : (b))
- typedef struct record_s {
- char r_own_uuid[UUIDLEN];
- char r_obj_uuid[UUIDLEN];
- char r_ns[MAXNSLEN];
- int r_type;
- int r_sharks_cnt;
- uint64_t r_len;
- } record_t;
- typedef struct obj_s {
- char o_uuid[UUIDLEN];
- uint64_t o_size;
- int o_nsid;
- avl_node_t o_avlnode;
- list_node_t o_lnode;
- } obj_t;
- typedef struct owner_s {
- char o_uuid[UUIDLEN];
- avl_tree_t o_objtree;
- list_t o_objlist;
- uint64_t o_dirs[MAXNS];
- uint64_t o_objs[MAXNS];
- uint64_t o_keys[MAXNS];
- uint64_t o_bytes[MAXNS];
- avl_node_t o_avlnode;
- list_node_t o_lnode;
- } owner_t;
- static int
- avlnode_comparator(const void *l, const void *r)
- {
- int cmp;
- if ((cmp = strcmp((char *) l, (char *) r)) != 0)
- return (cmp/abs(cmp));
- return (0);
- }
- typedef struct buff_s {
- char *b_buff;
- size_t b_len;
- list_node_t b_lnode;
- } buff_t;
- typedef struct rjob_s {
- int rj_idx;
- pthread_t rj_thread;
- struct rjob_s *rj_next;
- avl_tree_t rj_towners;
- list_t rj_lowners;
- list_t *rj_abufslst;
- list_t *rj_fbufslst;
- char **rj_ns;
- int rj_nscnt;
- pthread_mutex_t *rj_abuflk;
- pthread_mutex_t *rj_fbuflk;
- pthread_cond_t *rj_abufcnd;
- pthread_cond_t *rj_fbufcnd;
- } rjob_t;
- static void *
- safe_malloc(size_t sz)
- {
- char *ret;
- if ((ret = malloc(sz)) == NULL) {
- fprintf(stderr, "failed to allocate memory\n");
- exit(2);
- }
- return (ret);
- }
- /*
- * Debugging functions
- */
- /*
- static void
- print_record(record_t *rec, int indent)
- {
- char *s1, *s2;
- s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
- memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
- s1[indent] = '\0'; s2[indent + 1] = '\0';
- printf("%s{\n"
- "%sown = \"%s\"\n"
- "%suuid = \"%s\"\n"
- "%sns = \"%s\"\n"
- "%stype = \"%s\"\n"
- "%sshark_cnt = %d\n"
- "%slen = %"PRIu64"\n"
- "%s}\n",
- s1,
- s2, rec->r_own_uuid,
- s2, rec->r_obj_uuid,
- s2, rec->r_ns,
- s2, rec->r_type == RTYPE_OBJECT ? "object" : "directory",
- s2, rec->r_sharks_cnt,
- s2, rec->r_len,
- s1);
- free(s1);
- free(s2);
- }
- static void
- print_object(obj_t *obj, int indent)
- {
- char *s1, *s2;
- s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
- memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
- s1[indent] = '\0'; s2[indent + 1] = '\0';
- printf("%s{\n"
- "%suuid = \"%s\"\n"
- "%ssize = %"PRIu64"\n"
- "%snsid = %d\n"
- "%s}\n",
- s1,
- s2, obj->o_uuid,
- s2, obj->o_size,
- s2, obj->o_nsid,
- s1);
- free(s1); free(s2);
- }
- static void
- print_owner(owner_t *own, int indent)
- {
- obj_t *obj;
- char *s1, *s2;
- s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
- memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
- s1[indent] = '\0'; s2[indent + 1] = '\0';
- printf("%s{\n"
- "%suuid = \"%s\"\n"
- "%sdirs[0] = %"PRIu64" dirs[1] = %"PRIu64" dirs[2] = %"PRIu64" dirs[3] = %"PRIu64" dirs[4] = %"PRIu64" dirs[5] = %"PRIu64"\n"
- "%sobjs[0] = %"PRIu64" objs[1] = %"PRIu64" objs[2] = %"PRIu64" objs[3] = %"PRIu64" objs[4] = %"PRIu64" objs[5] = %"PRIu64"\n"
- "%skeys[0] = %"PRIu64" keys[1] = %"PRIu64" keys[2] = %"PRIu64" keys[3] = %"PRIu64" keys[4] = %"PRIu64" keys[5] = %"PRIu64"\n"
- "%sbytes[0] = %"PRIu64" bytes[1] = %"PRIu64" bytes[2] = %"PRIu64" bytes[3] = %"PRIu64" bytes[4] = %"PRIu64" bytes[5] = %"PRIu64"\n",
- s1,
- s2, own->o_uuid,
- s2, own->o_dirs[0], own->o_dirs[1], own->o_dirs[2], own->o_dirs[3], own->o_dirs[4], own->o_dirs[5],
- s2, own->o_objs[0], own->o_objs[1], own->o_objs[2], own->o_objs[3], own->o_objs[4], own->o_objs[5],
- s2, own->o_keys[0], own->o_keys[1], own->o_keys[2], own->o_keys[3], own->o_keys[4], own->o_keys[5],
- s2, own->o_bytes[0], own->o_bytes[1], own->o_bytes[2], own->o_bytes[3], own->o_bytes[4], own->o_bytes[5]);
- printf("%sobjs = [\n", s2);
- for (obj = list_head(&own->o_objlist); obj != NULL;
- obj = list_next(&own->o_objlist, obj)) {
- print_object(obj, indent + 1);
- }
- printf("%s]\n"
- "%s}\n",
- s2,
- s1);
- free(s1); free(s2);
- }
- static void
- print_job(rjob_t *rj, int indent)
- {
- owner_t *own;
- char *s1, *s2;
- s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
- memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
- s1[indent] = '\0'; s2[indent + 1] = '\0';
- printf("%s{\n"
- "%sidx = %d\n"
- "%sowners = [\n",
- s1,
- s2, rj->rj_idx,
- s2);
- for (own = list_head(&rj->rj_lowners); own != NULL;
- own = list_next(&rj->rj_lowners, own)) {
- print_owner(own, indent + 1);
- }
- printf("%s]\n"
- "%s}\n",
- s2,
- s1);
- free(s1); free(s2);
- }
- */
- static void *
- safe_zmalloc(size_t sz)
- {
- return memset(safe_malloc(sz), 0, sz);
- }
- static int
- nsidx(rjob_t *rj, char *ns)
- {
- char **jns;
- jns = rj->rj_ns;
- while(*jns != NULL) {
- if (strcmp(*jns, ns) == 0)
- return jns - rj->rj_ns;
- jns++;
- }
- fprintf(stderr, "%s: filed to find namespace = %s\n",
- __func__, ns);
- exit(2);
- return (0);
- }
- static char *
- nsname(rjob_t *rj, int idx)
- {
- if (idx >= 0 && idx < rj->rj_nscnt)
- return (rj->rj_ns[idx]);
- fprintf(stderr, "%s: filed to find namespace = %d\n",
- __func__, idx);
- exit(2);
- return (NULL);
- }
- static buff_t *
- get_buff(list_t *bufflst, pthread_mutex_t *lstlk, pthread_cond_t *lstcnd)
- {
- buff_t *buff;
- pthread_mutex_lock(lstlk);
- while (list_is_empty(bufflst)) {
- pthread_cond_wait(lstcnd, lstlk);
- }
- buff = list_remove_tail(bufflst);
- pthread_mutex_unlock(lstlk);
- return (buff);
- }
- static void
- put_buff(buff_t *buff, list_t *bufflst, pthread_mutex_t *lstlk,
- pthread_cond_t *lstcnd)
- {
- pthread_mutex_lock(lstlk);
- list_insert_head(bufflst, buff);
- pthread_cond_signal(lstcnd);
- pthread_mutex_unlock(lstlk);
- }
- int
- json_to_record(char *json, record_t *r)
- {
- const char *str, *p;
- json_t *jobj, *jprop;
- jobj = NULL;
- if ((jobj = json_loads(json, JSON_REJECT_DUPLICATES, NULL)) == NULL)
- goto error;
- if ((jprop = json_object_get(jobj, "key")) == NULL ||
- (str = json_string_value(jprop)) == NULL ||
- strlen(str) < 39) {
- goto error;
- }
- if ((p = strchr(str + 38, '/')) == NULL) {
- if (strlen(str + 38) > MAXNSLEN -1)
- goto error;
- strcpy(r->r_ns, str + 38);
- } else {
- strncpy(r->r_ns, str + 38, p - str - 38);
- r->r_ns[p - str - 38] = '\0';
- }
- if ((jprop = json_object_get(jobj, "type")) == NULL ||
- (str = json_string_value(jprop)) == NULL) {
- goto error;
- }
- if (strcmp(str, "directory") == 0)
- r->r_type = RTYPE_DIRECTORY;
- else if (strcmp(str, "object") == 0)
- r->r_type = RTYPE_OBJECT;
- else
- goto error;
- r->r_sharks_cnt = 0;
- r->r_len = 0;
- r->r_obj_uuid[0] = '\0';
- if (r->r_type == RTYPE_OBJECT) {
- if ((jprop = json_object_get(jobj, "sharks")) == NULL)
- goto error;
- r->r_sharks_cnt = json_array_size(jprop);
- if ((jprop = json_object_get(jobj, "contentLength")) == NULL)
- goto error;
- r->r_len = json_integer_value(jprop);
- if ((jprop = json_object_get(jobj, "objectId")) == NULL ||
- (str =json_string_value(jprop)) == NULL) {
- goto error;
- }
- strcpy(r->r_obj_uuid, str);
- }
- if ((jprop = json_object_get(jobj, "owner")) == NULL ||
- (str = json_string_value(jprop)) == NULL) {
- goto error;
- }
- strcpy(r->r_own_uuid, str);
- json_decref(jobj);
- return (0);
- error:
- fprintf(stderr, "Failed to parse json object `%s`\n", json);
- json_decref(jobj);
- return (1);
- }
- static void
- rjob_process_record(rjob_t *rj, record_t *rec)
- {
- owner_t own, *pown;
- obj_t obj, *pobj;
- int nsid;
- strcpy(own.o_uuid, rec->r_own_uuid);
- if ((pown = avl_find(&rj->rj_towners, &own, NULL)) == NULL) {
- pown = safe_zmalloc(sizeof(owner_t));
- strcpy(pown->o_uuid, rec->r_own_uuid);
- avl_add(&rj->rj_towners, pown);
- avl_create(&pown->o_objtree, avlnode_comparator,
- sizeof (obj_t), offsetof (obj_t, o_avlnode));
- list_create(&pown->o_objlist, sizeof (obj_t),
- offsetof (obj_t, o_lnode));
- }
- nsid = nsidx(rj, rec->r_ns);
- if (rec->r_type == RTYPE_DIRECTORY) {
- pown->o_dirs[nsid]++;
- return;
- }
- strcpy(obj.o_uuid, rec->r_obj_uuid);
- if ((pobj = avl_find(&pown->o_objtree, &obj, NULL)) == NULL) {
- pobj = safe_zmalloc(sizeof (obj_t));
- strcpy(pobj->o_uuid, rec->r_obj_uuid);
- pobj->o_nsid = nsid;
- pobj->o_size = MAX(rec->r_len, MIN_SIZE) * rec->r_sharks_cnt;
- avl_add(&pown->o_objtree, pobj);
- pown->o_objs[nsid]++;
- pown->o_bytes[nsid] += pobj->o_size;
- }
- pown->o_keys[nsid]++;
- }
- static void
- rjob_avltree_to_list(rjob_t *rj)
- {
- obj_t *pobj;
- owner_t *pown;
- for (pown = avl_first(&rj->rj_towners); pown != NULL;
- pown = AVL_NEXT(&rj->rj_towners, pown)) {
- list_insert_tail(&rj->rj_lowners, pown);
- for (pobj = avl_first(&pown->o_objtree); pobj != NULL;
- pobj = AVL_NEXT(&pown->o_objtree, pobj)) {
- list_insert_tail(&pown->o_objlist, pobj);
- }
- }
- }
- static void *
- rjob_scan(void *arg)
- {
- rjob_t *rj;
- buff_t *buff;
- record_t rec;
- rj = (rjob_t *) arg;
- while (1) {
- buff = get_buff(rj->rj_abufslst, rj->rj_abuflk, rj->rj_abufcnd);
- if (buff->b_buff == NULL) {
- put_buff(buff, rj->rj_fbufslst,
- rj->rj_fbuflk, rj->rj_fbufcnd);
- break;
- }
- if (json_to_record(buff->b_buff, &rec) != 0) {
- fprintf(stderr, "Invalid input record \"%s\"\n",
- buff->b_buff);
- exit(2);
- }
- put_buff(buff, rj->rj_fbufslst, rj->rj_fbuflk, rj->rj_fbufcnd);
- rjob_process_record(rj, &rec);
- }
- rjob_avltree_to_list(rj);
- return (NULL);
- }
- static void
- rjob_merge_owners(owner_t *own1,owner_t *own2)
- {
- int i, cmp, nsid;
- list_t olist, *list1, *list2;
- obj_t *obj, *obj1, *obj2;
- list1 = &own1->o_objlist;
- list2 = &own2->o_objlist;
- obj1 = list_head(list1);
- obj2 = list_head(list2);
- list_create(&olist, sizeof (obj_t), offsetof (obj_t, o_lnode));
- while (obj1 != NULL || obj2 != NULL) {
- if (obj1 != NULL && obj2 != NULL) {
- if ((cmp = strcmp(obj1->o_uuid, obj2->o_uuid)) == 0) {
- nsid = obj2->o_nsid;
- own2->o_keys[nsid]--;
- own2->o_objs[nsid]--;
- own2->o_bytes[nsid] -= obj2->o_size;
- obj = list_next(list2, obj2);
- list_remove(list2, obj2);
- free(obj2);
- obj2 = obj;
- own1->o_keys[nsid]++;
- obj = list_next(list1, obj1);
- list_remove(list1, obj1);
- list_insert_tail(&olist, obj1);
- obj1 = obj;
- } else if (cmp < 0) {
- obj = list_next(list1, obj1);
- list_remove(list1, obj1);
- list_insert_tail(&olist, obj1);
- obj1 = obj;
- } else { /* (cmp > 0) */
- nsid = obj2->o_nsid;
- own2->o_keys[nsid]--;
- own2->o_objs[nsid]--;
- own2->o_bytes[nsid] -= obj2->o_size;
- own1->o_objs[nsid]++;
- own1->o_keys[nsid]++;
- own1->o_bytes[nsid] += obj2->o_size;
- obj = list_next(list2, obj2);
- list_remove(list2, obj2);
- list_insert_tail(&olist, obj2);
- obj2 = obj;
- }
- } else {
- if (obj1 == NULL && obj2 != NULL) {
- list_move_tail(&olist, list2);
- } else { /* (obj1 != NULL && obj2 == NULL) */
- list_move_tail(&olist, list1);
- }
- break;
- }
- }
- if (!list_is_empty(list1) || !list_is_empty(list2)) {
- fprintf(stderr, "One of the lists is not empty\n");
- exit(2);
- }
- for (i = 0; i < MAXNS; i++) {
- own1->o_dirs[i] += own2->o_dirs[i];
- own1->o_objs[i] += own2->o_objs[i];
- own1->o_keys[i] += own2->o_keys[i];
- own1->o_bytes[i] += own2->o_bytes[i];
- own2->o_dirs[i] = 0;
- own2->o_objs[i] = 0;
- own2->o_keys[i] = 0;
- own2->o_bytes[i] = 0;
- }
- /* Put the final result in own1 */
- list_move_tail(&own1->o_objlist, &olist);
- }
- static void *
- rjob_merge(void *arg)
- {
- int cmp;
- rjob_t *rj1, *rj2;
- owner_t *own, *own1, *own2;
- list_t olist, *list1, *list2;
- rj1 = (rjob_t *) arg;
- rj2 = rj1->rj_next;
- list1 = &rj1->rj_lowners;
- list2 = &rj2->rj_lowners;
- own1 = list_head(list1);
- own2 = list_head(list2);
- list_create(&olist, sizeof (owner_t), offsetof(owner_t, o_lnode));
- while (own1 != NULL || own2 != NULL) {
- if (own1 != NULL && own2 != NULL) {
- if ((cmp = strcmp(own1->o_uuid, own2->o_uuid)) == 0) {
- rjob_merge_owners(own1, own2);
- own = list_next(list2, own2);
- list_remove(list2, own2);
- free(own2);
- own2 = own;
- own = list_next(list1, own1);
- list_remove(list1, own1);
- list_insert_tail(&olist, own1);
- own1 = own;
- } else if (cmp < 0) {
- own = list_next(list1, own1);
- list_remove(list1, own1);
- list_insert_tail(&olist, own1);
- own1 = own;
- } else { /* (cmp > 0) */
- own = list_next(list2, own2);
- list_remove(list2, own2);
- list_insert_tail(&olist, own2);
- own2 = own;
- }
- } else {
- if (own1 == NULL && own2 != NULL) {
- list_move_tail(&olist, list2);
- } else { /* (own1 != NULL && own2 == NULL) */
- list_move_tail(&olist, list1);
- }
- break;
- }
- }
- if (!list_is_empty(list1) || !list_is_empty(list2)) {
- fprintf(stderr, "Something is going wrong, one of the lists "
- "is not empty\n");
- exit(2);
- }
- /* Put the ending result in job1's list */
- list_move_tail(&rj1->rj_lowners, &olist);
- return (NULL);
- }
- static void
- scan_input_stdin(rjob_t *rjobs, int nthreads)
- {
- int i;
- ssize_t len;
- void *status;
- buff_t *buff, *buffs;
- list_t abuflst, fbuflst;
- pthread_mutex_t abuflk;
- pthread_mutex_t fbuflk;
- pthread_cond_t abufcnd;
- pthread_cond_t fbufcnd;
- pthread_mutex_init(&abuflk, NULL);
- pthread_mutex_init(&fbuflk, NULL);
- pthread_cond_init(&abufcnd, NULL);
- pthread_cond_init(&fbufcnd, NULL);
- list_create(&abuflst, sizeof (buff_t), offsetof(buff_t, b_lnode));
- list_create(&fbuflst, sizeof (buff_t), offsetof(buff_t, b_lnode));
- buffs = safe_zmalloc(sizeof (buff_t) * nthreads * 2);
- for (i = 0; i < 2 * nthreads; i ++)
- list_insert_tail(&fbuflst, &buffs[i]);
- for (i = 0; i < nthreads; i++) {
- rjobs[i].rj_abufslst = &abuflst;
- rjobs[i].rj_fbufslst = &fbuflst;
- rjobs[i].rj_abuflk = &abuflk;
- rjobs[i].rj_fbuflk = &fbuflk;
- rjobs[i].rj_abufcnd = &abufcnd;
- rjobs[i].rj_fbufcnd = &fbufcnd;
- if (pthread_create(&rjobs[i].rj_thread, NULL,
- rjob_scan, &rjobs[i]) != 0) {
- fprintf(stderr, "Failed to create scanner thread\n");
- exit(1);
- }
- }
- buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
- while ((len = getline(&buff->b_buff, &buff->b_len, stdin)) != -1) {
- if (len > 0) {
- if (buff->b_buff[len - 1] == '\n') {
- buff->b_buff[len - 1] = 0;
- }
- put_buff(buff, &abuflst, &abuflk, &abufcnd);
- buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
- continue;
- }
- /* empty input */
- break;
- }
- put_buff(buff, &fbuflst, &fbuflk, &fbufcnd);
- for (i = 0; i < nthreads; i++) {
- buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
- free(buff->b_buff);
- buff->b_buff = NULL;
- buff->b_len = 0;
- put_buff(buff, &abuflst, &abuflk, &abufcnd);
- }
- for (i = 0; i < nthreads; i++) {
- pthread_join(rjobs[i].rj_thread, &status);
- }
- while (nthreads != 1) {
- for (i = 0; i < nthreads / 2; i++) {
- rjobs[i].rj_next = &rjobs[i + nthreads / 2];
- if (pthread_create(&rjobs[i].rj_thread, NULL,
- rjob_merge, &rjobs[i]) != 0) {
- fprintf(stderr, "Failed to create merger "
- "thread\n");
- exit(1);
- }
- }
- for (i = 0; i < nthreads / 2; i++) {
- pthread_join(rjobs[i].rj_thread, &status);
- }
- nthreads /= 2;
- }
- }
- static void
- output(rjob_t *rj)
- {
- int n;
- owner_t *own;
- for (own = list_head(&rj->rj_lowners); own != NULL;
- own = list_next(&rj->rj_lowners, own)) {
- for (n = 0; n < rj->rj_nscnt; n++) {
- fprintf(stdout, "{"
- "\"owner\":\"%s\","
- "\"namespace\":\"%s\","
- "\"directories\":%" PRIu64 ","
- "\"keys\":%" PRIu64 ","
- "\"objects\":%" PRIu64 ","
- "\"bytes\":\"%" PRIu64 "\"}\n",
- own->o_uuid, nsname(rj, n), own->o_dirs[n],
- own->o_keys[n], own->o_objs[n],
- own->o_bytes[n]);
- }
- }
- }
- static char **
- parse_namespaces(int *cnt)
- {
- char *n, *sns, *ns, *ens, **nspaces;
- if ((ens = getenv("NAMESPACES")) == NULL)
- ens = "stor public jobs reports";
- *cnt = 0;
- sns = ns = strdup(ens);
- nspaces = safe_malloc(sizeof (char *) * strlen(ens) + 1);
- while ((n = strsep(&ns, " ")) != NULL) {
- if (*n == '\0')
- continue;
- nspaces[(*cnt)++] = strdup(n);
- }
- free(sns);
- nspaces[*cnt] = NULL;
- if (*cnt <= MAXNS)
- return (nspaces);
- while (*nspaces != NULL)
- free(*nspaces++);
- return (NULL);
- }
- #define NTHREADS 16
- int
- main()
- {
- int i, nscnt , nthreads = NTHREADS;
- rjob_t *rjobs;
- char **nspaces;
- if ((nspaces = parse_namespaces(&nscnt)) == NULL) {
- fprintf(stderr, "Failed to parse namespaces\n");
- exit(1);
- }
- rjobs = safe_zmalloc(sizeof (rjob_t) * nthreads);
- for (i = 0; i < nthreads; i++) {
- rjobs[i].rj_idx = i;
- rjobs[i].rj_ns = nspaces;
- rjobs[i].rj_nscnt = nscnt;
- avl_create(&rjobs[i].rj_towners, avlnode_comparator,
- sizeof (owner_t), offsetof (owner_t, o_avlnode));
- list_create(&rjobs[i].rj_lowners,
- sizeof (owner_t), offsetof (owner_t, o_lnode));
- }
- scan_input_stdin(rjobs, nthreads);
- output(rjobs);
- return (0);
- }
Add Comment
Please, Sign In to add comment