Guest User

Untitled

a guest
May 24th, 2018
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.62 KB | None | 0 0
  1. #include <stdio.h>
  2. #include <sys/types.h>
  3. #include <sys/stat.h>
  4. #include <fcntl.h>
  5. #include <pthread.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <sys/list.h>
  9. #include <sys/avl.h>
  10. #include <string.h>
  11. #include <math.h>
  12. #include <stddef.h>
  13.  
  14. #include <jansson.h>
  15.  
  16. #define MAXNS 6
  17. #define MAXNSLEN 16
  18. #define MAXPATHLEN 1024
  19. #define UUIDLEN 64
  20.  
  21. #define RTYPE_OBJECT 0
  22. #define RTYPE_DIRECTORY 1
  23.  
  24. #define MIN_SIZE 131072
  25.  
  26. #define MAX(a,b) (((a) > (b)) ? (a) : (b))
  27.  
  28. typedef struct record_s {
  29. char r_own_uuid[UUIDLEN];
  30. char r_obj_uuid[UUIDLEN];
  31. char r_ns[MAXNSLEN];
  32. int r_type;
  33. int r_sharks_cnt;
  34. uint64_t r_len;
  35. } record_t;
  36.  
  37. typedef struct obj_s {
  38. char o_uuid[UUIDLEN];
  39. uint64_t o_size;
  40. int o_nsid;
  41. avl_node_t o_avlnode;
  42. list_node_t o_lnode;
  43. } obj_t;
  44.  
  45. typedef struct owner_s {
  46. char o_uuid[UUIDLEN];
  47. avl_tree_t o_objtree;
  48. list_t o_objlist;
  49. uint64_t o_dirs[MAXNS];
  50. uint64_t o_objs[MAXNS];
  51. uint64_t o_keys[MAXNS];
  52. uint64_t o_bytes[MAXNS];
  53. avl_node_t o_avlnode;
  54. list_node_t o_lnode;
  55. } owner_t;
  56.  
  57. static int
  58. avlnode_comparator(const void *l, const void *r)
  59. {
  60. int cmp;
  61. if ((cmp = strcmp((char *) l, (char *) r)) != 0)
  62. return (cmp/abs(cmp));
  63. return (0);
  64. }
  65.  
  66. typedef struct buff_s {
  67. char *b_buff;
  68. size_t b_len;
  69. list_node_t b_lnode;
  70. } buff_t;
  71.  
  72. typedef struct rjob_s {
  73. int rj_idx;
  74. pthread_t rj_thread;
  75. struct rjob_s *rj_next;
  76. avl_tree_t rj_towners;
  77. list_t rj_lowners;
  78. list_t *rj_abufslst;
  79. list_t *rj_fbufslst;
  80. char **rj_ns;
  81. int rj_nscnt;
  82. pthread_mutex_t *rj_abuflk;
  83. pthread_mutex_t *rj_fbuflk;
  84. pthread_cond_t *rj_abufcnd;
  85. pthread_cond_t *rj_fbufcnd;
  86. } rjob_t;
  87.  
  88. static void *
  89. safe_malloc(size_t sz)
  90. {
  91. char *ret;
  92. if ((ret = malloc(sz)) == NULL) {
  93. fprintf(stderr, "failed to allocate memory\n");
  94. exit(2);
  95. }
  96. return (ret);
  97. }
  98.  
  99. /*
  100. * Debugging functions
  101. */
  102. /*
  103. static void
  104. print_record(record_t *rec, int indent)
  105. {
  106. char *s1, *s2;
  107.  
  108. s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
  109. memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
  110. s1[indent] = '\0'; s2[indent + 1] = '\0';
  111.  
  112. printf("%s{\n"
  113. "%sown = \"%s\"\n"
  114. "%suuid = \"%s\"\n"
  115. "%sns = \"%s\"\n"
  116. "%stype = \"%s\"\n"
  117. "%sshark_cnt = %d\n"
  118. "%slen = %"PRIu64"\n"
  119. "%s}\n",
  120. s1,
  121. s2, rec->r_own_uuid,
  122. s2, rec->r_obj_uuid,
  123. s2, rec->r_ns,
  124. s2, rec->r_type == RTYPE_OBJECT ? "object" : "directory",
  125. s2, rec->r_sharks_cnt,
  126. s2, rec->r_len,
  127. s1);
  128.  
  129. free(s1);
  130. free(s2);
  131. }
  132.  
  133. static void
  134. print_object(obj_t *obj, int indent)
  135. {
  136. char *s1, *s2;
  137.  
  138. s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
  139. memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
  140. s1[indent] = '\0'; s2[indent + 1] = '\0';
  141. printf("%s{\n"
  142. "%suuid = \"%s\"\n"
  143. "%ssize = %"PRIu64"\n"
  144. "%snsid = %d\n"
  145. "%s}\n",
  146. s1,
  147. s2, obj->o_uuid,
  148. s2, obj->o_size,
  149. s2, obj->o_nsid,
  150. s1);
  151.  
  152. free(s1); free(s2);
  153. }
  154.  
  155. static void
  156. print_owner(owner_t *own, int indent)
  157. {
  158. obj_t *obj;
  159. char *s1, *s2;
  160.  
  161. s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
  162. memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
  163. s1[indent] = '\0'; s2[indent + 1] = '\0';
  164. printf("%s{\n"
  165. "%suuid = \"%s\"\n"
  166. "%sdirs[0] = %"PRIu64" dirs[1] = %"PRIu64" dirs[2] = %"PRIu64" dirs[3] = %"PRIu64" dirs[4] = %"PRIu64" dirs[5] = %"PRIu64"\n"
  167. "%sobjs[0] = %"PRIu64" objs[1] = %"PRIu64" objs[2] = %"PRIu64" objs[3] = %"PRIu64" objs[4] = %"PRIu64" objs[5] = %"PRIu64"\n"
  168. "%skeys[0] = %"PRIu64" keys[1] = %"PRIu64" keys[2] = %"PRIu64" keys[3] = %"PRIu64" keys[4] = %"PRIu64" keys[5] = %"PRIu64"\n"
  169. "%sbytes[0] = %"PRIu64" bytes[1] = %"PRIu64" bytes[2] = %"PRIu64" bytes[3] = %"PRIu64" bytes[4] = %"PRIu64" bytes[5] = %"PRIu64"\n",
  170. s1,
  171. s2, own->o_uuid,
  172. s2, own->o_dirs[0], own->o_dirs[1], own->o_dirs[2], own->o_dirs[3], own->o_dirs[4], own->o_dirs[5],
  173. s2, own->o_objs[0], own->o_objs[1], own->o_objs[2], own->o_objs[3], own->o_objs[4], own->o_objs[5],
  174. s2, own->o_keys[0], own->o_keys[1], own->o_keys[2], own->o_keys[3], own->o_keys[4], own->o_keys[5],
  175. s2, own->o_bytes[0], own->o_bytes[1], own->o_bytes[2], own->o_bytes[3], own->o_bytes[4], own->o_bytes[5]);
  176.  
  177. printf("%sobjs = [\n", s2);
  178. for (obj = list_head(&own->o_objlist); obj != NULL;
  179. obj = list_next(&own->o_objlist, obj)) {
  180. print_object(obj, indent + 1);
  181. }
  182. printf("%s]\n"
  183. "%s}\n",
  184. s2,
  185. s1);
  186.  
  187. free(s1); free(s2);
  188. }
  189.  
  190. static void
  191. print_job(rjob_t *rj, int indent)
  192. {
  193. owner_t *own;
  194. char *s1, *s2;
  195.  
  196. s1 = safe_malloc(indent + 1); s2 = safe_malloc(indent + 2);
  197. memset(s1, '\t', indent); memset(s2, '\t', indent + 1);
  198. s1[indent] = '\0'; s2[indent + 1] = '\0';
  199.  
  200. printf("%s{\n"
  201. "%sidx = %d\n"
  202. "%sowners = [\n",
  203. s1,
  204. s2, rj->rj_idx,
  205. s2);
  206.  
  207. for (own = list_head(&rj->rj_lowners); own != NULL;
  208. own = list_next(&rj->rj_lowners, own)) {
  209. print_owner(own, indent + 1);
  210. }
  211.  
  212. printf("%s]\n"
  213. "%s}\n",
  214. s2,
  215. s1);
  216.  
  217. free(s1); free(s2);
  218. }
  219. */
  220.  
  221. static void *
  222. safe_zmalloc(size_t sz)
  223. {
  224. return memset(safe_malloc(sz), 0, sz);
  225. }
  226.  
  227. static int
  228. nsidx(rjob_t *rj, char *ns)
  229. {
  230. char **jns;
  231.  
  232. jns = rj->rj_ns;
  233. while(*jns != NULL) {
  234. if (strcmp(*jns, ns) == 0)
  235. return jns - rj->rj_ns;
  236. jns++;
  237. }
  238.  
  239. fprintf(stderr, "%s: filed to find namespace = %s\n",
  240. __func__, ns);
  241. exit(2);
  242. return (0);
  243. }
  244.  
  245. static char *
  246. nsname(rjob_t *rj, int idx)
  247. {
  248. if (idx >= 0 && idx < rj->rj_nscnt)
  249. return (rj->rj_ns[idx]);
  250.  
  251. fprintf(stderr, "%s: filed to find namespace = %d\n",
  252. __func__, idx);
  253. exit(2);
  254. return (NULL);
  255. }
  256.  
  257. static buff_t *
  258. get_buff(list_t *bufflst, pthread_mutex_t *lstlk, pthread_cond_t *lstcnd)
  259. {
  260. buff_t *buff;
  261. pthread_mutex_lock(lstlk);
  262. while (list_is_empty(bufflst)) {
  263. pthread_cond_wait(lstcnd, lstlk);
  264. }
  265. buff = list_remove_tail(bufflst);
  266. pthread_mutex_unlock(lstlk);
  267. return (buff);
  268.  
  269. }
  270.  
  271. static void
  272. put_buff(buff_t *buff, list_t *bufflst, pthread_mutex_t *lstlk,
  273. pthread_cond_t *lstcnd)
  274. {
  275. pthread_mutex_lock(lstlk);
  276. list_insert_head(bufflst, buff);
  277. pthread_cond_signal(lstcnd);
  278. pthread_mutex_unlock(lstlk);
  279. }
  280.  
  281.  
  282. int
  283. json_to_record(char *json, record_t *r)
  284. {
  285. const char *str, *p;
  286. json_t *jobj, *jprop;
  287.  
  288. jobj = NULL;
  289. if ((jobj = json_loads(json, JSON_REJECT_DUPLICATES, NULL)) == NULL)
  290. goto error;
  291.  
  292. if ((jprop = json_object_get(jobj, "key")) == NULL ||
  293. (str = json_string_value(jprop)) == NULL ||
  294. strlen(str) < 39) {
  295. goto error;
  296. }
  297.  
  298.  
  299. if ((p = strchr(str + 38, '/')) == NULL) {
  300. if (strlen(str + 38) > MAXNSLEN -1)
  301. goto error;
  302. strcpy(r->r_ns, str + 38);
  303. } else {
  304. strncpy(r->r_ns, str + 38, p - str - 38);
  305. r->r_ns[p - str - 38] = '\0';
  306. }
  307.  
  308.  
  309. if ((jprop = json_object_get(jobj, "type")) == NULL ||
  310. (str = json_string_value(jprop)) == NULL) {
  311. goto error;
  312. }
  313.  
  314. if (strcmp(str, "directory") == 0)
  315. r->r_type = RTYPE_DIRECTORY;
  316. else if (strcmp(str, "object") == 0)
  317. r->r_type = RTYPE_OBJECT;
  318. else
  319. goto error;
  320.  
  321.  
  322. r->r_sharks_cnt = 0;
  323. r->r_len = 0;
  324. r->r_obj_uuid[0] = '\0';
  325. if (r->r_type == RTYPE_OBJECT) {
  326. if ((jprop = json_object_get(jobj, "sharks")) == NULL)
  327. goto error;
  328.  
  329. r->r_sharks_cnt = json_array_size(jprop);
  330. if ((jprop = json_object_get(jobj, "contentLength")) == NULL)
  331. goto error;
  332. r->r_len = json_integer_value(jprop);
  333.  
  334. if ((jprop = json_object_get(jobj, "objectId")) == NULL ||
  335. (str =json_string_value(jprop)) == NULL) {
  336. goto error;
  337. }
  338. strcpy(r->r_obj_uuid, str);
  339. }
  340.  
  341. if ((jprop = json_object_get(jobj, "owner")) == NULL ||
  342. (str = json_string_value(jprop)) == NULL) {
  343. goto error;
  344. }
  345.  
  346. strcpy(r->r_own_uuid, str);
  347. json_decref(jobj);
  348. return (0);
  349. error:
  350. fprintf(stderr, "Failed to parse json object `%s`\n", json);
  351. json_decref(jobj);
  352. return (1);
  353. }
  354.  
  355. static void
  356. rjob_process_record(rjob_t *rj, record_t *rec)
  357. {
  358. owner_t own, *pown;
  359. obj_t obj, *pobj;
  360. int nsid;
  361.  
  362. strcpy(own.o_uuid, rec->r_own_uuid);
  363. if ((pown = avl_find(&rj->rj_towners, &own, NULL)) == NULL) {
  364. pown = safe_zmalloc(sizeof(owner_t));
  365. strcpy(pown->o_uuid, rec->r_own_uuid);
  366. avl_add(&rj->rj_towners, pown);
  367.  
  368. avl_create(&pown->o_objtree, avlnode_comparator,
  369. sizeof (obj_t), offsetof (obj_t, o_avlnode));
  370. list_create(&pown->o_objlist, sizeof (obj_t),
  371. offsetof (obj_t, o_lnode));
  372. }
  373.  
  374. nsid = nsidx(rj, rec->r_ns);
  375.  
  376. if (rec->r_type == RTYPE_DIRECTORY) {
  377. pown->o_dirs[nsid]++;
  378. return;
  379. }
  380.  
  381. strcpy(obj.o_uuid, rec->r_obj_uuid);
  382. if ((pobj = avl_find(&pown->o_objtree, &obj, NULL)) == NULL) {
  383. pobj = safe_zmalloc(sizeof (obj_t));
  384. strcpy(pobj->o_uuid, rec->r_obj_uuid);
  385. pobj->o_nsid = nsid;
  386. pobj->o_size = MAX(rec->r_len, MIN_SIZE) * rec->r_sharks_cnt;
  387. avl_add(&pown->o_objtree, pobj);
  388.  
  389. pown->o_objs[nsid]++;
  390. pown->o_bytes[nsid] += pobj->o_size;
  391. }
  392.  
  393. pown->o_keys[nsid]++;
  394. }
  395.  
  396. static void
  397. rjob_avltree_to_list(rjob_t *rj)
  398. {
  399. obj_t *pobj;
  400. owner_t *pown;
  401.  
  402. for (pown = avl_first(&rj->rj_towners); pown != NULL;
  403. pown = AVL_NEXT(&rj->rj_towners, pown)) {
  404.  
  405. list_insert_tail(&rj->rj_lowners, pown);
  406. for (pobj = avl_first(&pown->o_objtree); pobj != NULL;
  407. pobj = AVL_NEXT(&pown->o_objtree, pobj)) {
  408. list_insert_tail(&pown->o_objlist, pobj);
  409. }
  410. }
  411. }
  412.  
  413. static void *
  414. rjob_scan(void *arg)
  415. {
  416. rjob_t *rj;
  417. buff_t *buff;
  418. record_t rec;
  419.  
  420. rj = (rjob_t *) arg;
  421.  
  422. while (1) {
  423. buff = get_buff(rj->rj_abufslst, rj->rj_abuflk, rj->rj_abufcnd);
  424. if (buff->b_buff == NULL) {
  425. put_buff(buff, rj->rj_fbufslst,
  426. rj->rj_fbuflk, rj->rj_fbufcnd);
  427. break;
  428. }
  429. if (json_to_record(buff->b_buff, &rec) != 0) {
  430. fprintf(stderr, "Invalid input record \"%s\"\n",
  431. buff->b_buff);
  432. exit(2);
  433. }
  434.  
  435. put_buff(buff, rj->rj_fbufslst, rj->rj_fbuflk, rj->rj_fbufcnd);
  436. rjob_process_record(rj, &rec);
  437. }
  438.  
  439. rjob_avltree_to_list(rj);
  440. return (NULL);
  441. }
  442.  
  443. static void
  444. rjob_merge_owners(owner_t *own1,owner_t *own2)
  445. {
  446. int i, cmp, nsid;
  447. list_t olist, *list1, *list2;
  448. obj_t *obj, *obj1, *obj2;
  449.  
  450. list1 = &own1->o_objlist;
  451. list2 = &own2->o_objlist;
  452. obj1 = list_head(list1);
  453. obj2 = list_head(list2);
  454. list_create(&olist, sizeof (obj_t), offsetof (obj_t, o_lnode));
  455.  
  456. while (obj1 != NULL || obj2 != NULL) {
  457. if (obj1 != NULL && obj2 != NULL) {
  458. if ((cmp = strcmp(obj1->o_uuid, obj2->o_uuid)) == 0) {
  459.  
  460. nsid = obj2->o_nsid;
  461. own2->o_keys[nsid]--;
  462. own2->o_objs[nsid]--;
  463. own2->o_bytes[nsid] -= obj2->o_size;
  464.  
  465. obj = list_next(list2, obj2);
  466. list_remove(list2, obj2);
  467. free(obj2);
  468. obj2 = obj;
  469.  
  470. own1->o_keys[nsid]++;
  471.  
  472. obj = list_next(list1, obj1);
  473. list_remove(list1, obj1);
  474. list_insert_tail(&olist, obj1);
  475. obj1 = obj;
  476.  
  477. } else if (cmp < 0) {
  478. obj = list_next(list1, obj1);
  479. list_remove(list1, obj1);
  480. list_insert_tail(&olist, obj1);
  481. obj1 = obj;
  482. } else { /* (cmp > 0) */
  483.  
  484. nsid = obj2->o_nsid;
  485.  
  486. own2->o_keys[nsid]--;
  487. own2->o_objs[nsid]--;
  488. own2->o_bytes[nsid] -= obj2->o_size;
  489.  
  490. own1->o_objs[nsid]++;
  491. own1->o_keys[nsid]++;
  492. own1->o_bytes[nsid] += obj2->o_size;
  493.  
  494. obj = list_next(list2, obj2);
  495. list_remove(list2, obj2);
  496. list_insert_tail(&olist, obj2);
  497. obj2 = obj;
  498. }
  499. } else {
  500. if (obj1 == NULL && obj2 != NULL) {
  501. list_move_tail(&olist, list2);
  502. } else { /* (obj1 != NULL && obj2 == NULL) */
  503. list_move_tail(&olist, list1);
  504. }
  505. break;
  506. }
  507. }
  508.  
  509.  
  510. if (!list_is_empty(list1) || !list_is_empty(list2)) {
  511. fprintf(stderr, "One of the lists is not empty\n");
  512. exit(2);
  513. }
  514.  
  515. for (i = 0; i < MAXNS; i++) {
  516. own1->o_dirs[i] += own2->o_dirs[i];
  517. own1->o_objs[i] += own2->o_objs[i];
  518. own1->o_keys[i] += own2->o_keys[i];
  519. own1->o_bytes[i] += own2->o_bytes[i];
  520. own2->o_dirs[i] = 0;
  521. own2->o_objs[i] = 0;
  522. own2->o_keys[i] = 0;
  523. own2->o_bytes[i] = 0;
  524. }
  525.  
  526.  
  527. /* Put the final result in own1 */
  528. list_move_tail(&own1->o_objlist, &olist);
  529. }
  530.  
  531. static void *
  532. rjob_merge(void *arg)
  533. {
  534. int cmp;
  535. rjob_t *rj1, *rj2;
  536. owner_t *own, *own1, *own2;
  537. list_t olist, *list1, *list2;
  538.  
  539. rj1 = (rjob_t *) arg;
  540. rj2 = rj1->rj_next;
  541. list1 = &rj1->rj_lowners;
  542. list2 = &rj2->rj_lowners;
  543. own1 = list_head(list1);
  544. own2 = list_head(list2);
  545.  
  546. list_create(&olist, sizeof (owner_t), offsetof(owner_t, o_lnode));
  547.  
  548. while (own1 != NULL || own2 != NULL) {
  549. if (own1 != NULL && own2 != NULL) {
  550.  
  551. if ((cmp = strcmp(own1->o_uuid, own2->o_uuid)) == 0) {
  552.  
  553. rjob_merge_owners(own1, own2);
  554.  
  555. own = list_next(list2, own2);
  556. list_remove(list2, own2);
  557. free(own2);
  558. own2 = own;
  559.  
  560. own = list_next(list1, own1);
  561. list_remove(list1, own1);
  562. list_insert_tail(&olist, own1);
  563. own1 = own;
  564.  
  565. } else if (cmp < 0) {
  566. own = list_next(list1, own1);
  567. list_remove(list1, own1);
  568. list_insert_tail(&olist, own1);
  569. own1 = own;
  570. } else { /* (cmp > 0) */
  571. own = list_next(list2, own2);
  572. list_remove(list2, own2);
  573. list_insert_tail(&olist, own2);
  574. own2 = own;
  575. }
  576.  
  577. } else {
  578.  
  579. if (own1 == NULL && own2 != NULL) {
  580. list_move_tail(&olist, list2);
  581. } else { /* (own1 != NULL && own2 == NULL) */
  582. list_move_tail(&olist, list1);
  583. }
  584. break;
  585. }
  586. }
  587.  
  588. if (!list_is_empty(list1) || !list_is_empty(list2)) {
  589. fprintf(stderr, "Something is going wrong, one of the lists "
  590. "is not empty\n");
  591. exit(2);
  592. }
  593.  
  594. /* Put the ending result in job1's list */
  595. list_move_tail(&rj1->rj_lowners, &olist);
  596. return (NULL);
  597. }
  598.  
  599. static void
  600. scan_input_stdin(rjob_t *rjobs, int nthreads)
  601. {
  602. int i;
  603. ssize_t len;
  604. void *status;
  605. buff_t *buff, *buffs;
  606. list_t abuflst, fbuflst;
  607. pthread_mutex_t abuflk;
  608. pthread_mutex_t fbuflk;
  609. pthread_cond_t abufcnd;
  610. pthread_cond_t fbufcnd;
  611.  
  612. pthread_mutex_init(&abuflk, NULL);
  613. pthread_mutex_init(&fbuflk, NULL);
  614. pthread_cond_init(&abufcnd, NULL);
  615. pthread_cond_init(&fbufcnd, NULL);
  616. list_create(&abuflst, sizeof (buff_t), offsetof(buff_t, b_lnode));
  617. list_create(&fbuflst, sizeof (buff_t), offsetof(buff_t, b_lnode));
  618.  
  619. buffs = safe_zmalloc(sizeof (buff_t) * nthreads * 2);
  620. for (i = 0; i < 2 * nthreads; i ++)
  621. list_insert_tail(&fbuflst, &buffs[i]);
  622.  
  623. for (i = 0; i < nthreads; i++) {
  624. rjobs[i].rj_abufslst = &abuflst;
  625. rjobs[i].rj_fbufslst = &fbuflst;
  626. rjobs[i].rj_abuflk = &abuflk;
  627. rjobs[i].rj_fbuflk = &fbuflk;
  628. rjobs[i].rj_abufcnd = &abufcnd;
  629. rjobs[i].rj_fbufcnd = &fbufcnd;
  630.  
  631. if (pthread_create(&rjobs[i].rj_thread, NULL,
  632. rjob_scan, &rjobs[i]) != 0) {
  633. fprintf(stderr, "Failed to create scanner thread\n");
  634. exit(1);
  635. }
  636. }
  637.  
  638. buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
  639. while ((len = getline(&buff->b_buff, &buff->b_len, stdin)) != -1) {
  640. if (len > 0) {
  641. if (buff->b_buff[len - 1] == '\n') {
  642. buff->b_buff[len - 1] = 0;
  643. }
  644. put_buff(buff, &abuflst, &abuflk, &abufcnd);
  645. buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
  646. continue;
  647. }
  648.  
  649. /* empty input */
  650. break;
  651. }
  652. put_buff(buff, &fbuflst, &fbuflk, &fbufcnd);
  653.  
  654. for (i = 0; i < nthreads; i++) {
  655. buff = get_buff(&fbuflst, &fbuflk, &fbufcnd);
  656. free(buff->b_buff);
  657. buff->b_buff = NULL;
  658. buff->b_len = 0;
  659. put_buff(buff, &abuflst, &abuflk, &abufcnd);
  660. }
  661.  
  662. for (i = 0; i < nthreads; i++) {
  663. pthread_join(rjobs[i].rj_thread, &status);
  664. }
  665.  
  666. while (nthreads != 1) {
  667. for (i = 0; i < nthreads / 2; i++) {
  668. rjobs[i].rj_next = &rjobs[i + nthreads / 2];
  669. if (pthread_create(&rjobs[i].rj_thread, NULL,
  670. rjob_merge, &rjobs[i]) != 0) {
  671. fprintf(stderr, "Failed to create merger "
  672. "thread\n");
  673. exit(1);
  674. }
  675. }
  676.  
  677. for (i = 0; i < nthreads / 2; i++) {
  678. pthread_join(rjobs[i].rj_thread, &status);
  679. }
  680. nthreads /= 2;
  681. }
  682.  
  683. }
  684.  
  685. static void
  686. output(rjob_t *rj)
  687. {
  688. int n;
  689. owner_t *own;
  690. for (own = list_head(&rj->rj_lowners); own != NULL;
  691. own = list_next(&rj->rj_lowners, own)) {
  692. for (n = 0; n < rj->rj_nscnt; n++) {
  693. fprintf(stdout, "{"
  694. "\"owner\":\"%s\","
  695. "\"namespace\":\"%s\","
  696. "\"directories\":%" PRIu64 ","
  697. "\"keys\":%" PRIu64 ","
  698. "\"objects\":%" PRIu64 ","
  699. "\"bytes\":\"%" PRIu64 "\"}\n",
  700. own->o_uuid, nsname(rj, n), own->o_dirs[n],
  701. own->o_keys[n], own->o_objs[n],
  702. own->o_bytes[n]);
  703. }
  704. }
  705. }
  706.  
  707. static char **
  708. parse_namespaces(int *cnt)
  709. {
  710. char *n, *sns, *ns, *ens, **nspaces;
  711. if ((ens = getenv("NAMESPACES")) == NULL)
  712. ens = "stor public jobs reports";
  713.  
  714. *cnt = 0;
  715. sns = ns = strdup(ens);
  716. nspaces = safe_malloc(sizeof (char *) * strlen(ens) + 1);
  717. while ((n = strsep(&ns, " ")) != NULL) {
  718. if (*n == '\0')
  719. continue;
  720. nspaces[(*cnt)++] = strdup(n);
  721. }
  722. free(sns);
  723. nspaces[*cnt] = NULL;
  724.  
  725. if (*cnt <= MAXNS)
  726. return (nspaces);
  727.  
  728. while (*nspaces != NULL)
  729. free(*nspaces++);
  730.  
  731. return (NULL);
  732. }
  733.  
  734. #define NTHREADS 16
  735. int
  736. main()
  737. {
  738. int i, nscnt , nthreads = NTHREADS;
  739. rjob_t *rjobs;
  740. char **nspaces;
  741.  
  742. if ((nspaces = parse_namespaces(&nscnt)) == NULL) {
  743. fprintf(stderr, "Failed to parse namespaces\n");
  744. exit(1);
  745. }
  746.  
  747. rjobs = safe_zmalloc(sizeof (rjob_t) * nthreads);
  748.  
  749. for (i = 0; i < nthreads; i++) {
  750. rjobs[i].rj_idx = i;
  751. rjobs[i].rj_ns = nspaces;
  752. rjobs[i].rj_nscnt = nscnt;
  753. avl_create(&rjobs[i].rj_towners, avlnode_comparator,
  754. sizeof (owner_t), offsetof (owner_t, o_avlnode));
  755. list_create(&rjobs[i].rj_lowners,
  756. sizeof (owner_t), offsetof (owner_t, o_lnode));
  757. }
  758.  
  759. scan_input_stdin(rjobs, nthreads);
  760. output(rjobs);
  761.  
  762. return (0);
  763. }
Add Comment
Please, Sign In to add comment