Advertisement
Guest User

Untitled

a guest
Jun 18th, 2018
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.20 KB | None | 0 0
  1. #include "../include/RadixJoin.h"
  2. #include <iostream>
  3. #include <chrono>
  4. #include <math.h>
  5. #include <cassert>
  6.  
  7. RadixJoin::RadixJoin(relation_t *R, relation_t *S)
  8. {
  9.  
  10. if (R->number_tuples <= S->number_tuples)
  11. {
  12. R_ = R;
  13. S_ = S;
  14. }
  15. else
  16. {
  17. R_ = S;
  18. S_ = R;
  19. }
  20.  
  21. //
  22. }
  23.  
  24. RadixJoin::~RadixJoin()
  25. {
  26. }
  27.  
  28. /*
  29. *RADIX JOIN - Implement Radix join with the following requirements
  30. * 1. Multithreaded
  31. * 2. Use Histogramm to access memory offsets
  32. * 3. Use Radix Partitioning to create chunks fitting into the cache, but only one pass needed, i.e., figure out how
  33. * many bits you need to create N partitions
  34. *
  35. * Input: Use Member Variables R and S
  36. *
  37. * Return: result_relation_t - defined in Types.h
  38. */
  39.  
  40. // // create Worker Threads
  41. // std::thread t1(&RadixJoin::parallel_partitioning,this);
  42. // std::this_thread::sleep_for(std::chrono::seconds(5));
  43. // std::thread t2(&RadixJoin::parallel_partitioning,this);
  44. // t1.join();
  45. // t2.join();
  46.  
  47. result_relation_t *RadixJoin::nlj()
  48. {
  49.  
  50. //
  51. result_relation_t *result_rel = new result_relation_t;
  52.  
  53. std::vector<std::pair<uint32_t, uint32_t>> *result_data = new std::vector<std::pair<uint32_t, uint32_t>>;
  54. result_data->resize(S_->number_tuples);
  55. int cnt = 0;
  56. for (size_t i = 0; i < S_->number_tuples; i++)
  57. {
  58. for (size_t j = 0; j < R_->number_tuples; j++)
  59. {
  60. if (R_->data[j].key == S_->data[i].key)
  61. {
  62. result_data->operator[](cnt) = std::make_pair(R_->data[j].rid, S_->data[i].rid);
  63. cnt++;
  64. }
  65. }
  66. }
  67.  
  68. //
  69. result_rel->data = result_data;
  70. result_rel->number_tuples = result_data->size();
  71. return result_rel;
  72. }
  73.  
  74. // #define SERIAL
  75. result_relation_t *RadixJoin::join()
  76. {
  77.  
  78. // parallel partition R
  79. int npart = ((R_->number_tuples * sizeof(tuple_t)) / Config::CACHE_SIZE) + 1;
  80. // int npart = 4;
  81. NEXT_POW_2(npart);
  82. std::cout << "npart: " << npart << '\n';
  83. int nbits = log2(npart);
  84. std::cout << "Number of bits needed: " << nbits << '\n';
  85.  
  86. // malloc tmp output relation
  87. // R
  88. relation_t *reordered_R = new relation_t;
  89. reordered_R->data = new tuple_t[R_->number_tuples];
  90. reordered_R->number_tuples = R_->number_tuples;
  91.  
  92. #ifdef SERIAL
  93. int *hist_R = partitioning(0, nbits, R_, reordered_R->data);
  94. #else
  95. int histSize = pow(2, nbits) + 1;
  96. std::cout << "HISTSIZE" << histSize << '\n';
  97. // int** hist = new int[histSize]();
  98. //pointer to 2D array
  99. int **hist_R = new int *[Config::NUM_WORKERS];
  100.  
  101. //pointer initialization
  102. for (int i = 0; i < Config::NUM_WORKERS; i++)
  103. {
  104. hist_R[i] = new int[histSize]();
  105. }
  106.  
  107. int tup_p_thread = R_->number_tuples / Config::NUM_WORKERS;
  108. std::vector<std::thread> threads;
  109. //ToDo call parallel part with assigned thread id
  110. // Split input partition an calculate private memory area
  111.  
  112. Barrier barrier_r(Config::NUM_WORKERS);
  113.  
  114. for (size_t i = 0; i < Config::NUM_WORKERS; i++)
  115. {
  116. /* code */
  117. threads.emplace_back(&RadixJoin::parallel_partitioning, this, i, nbits, R_, reordered_R->data, hist_R, tup_p_thread, histSize, std::ref(barrier_r));
  118. // threads[i].join();
  119. }
  120.  
  121. for(auto& th : threads)
  122. {
  123. /* code */
  124. th.join();
  125. }
  126. std::cout << "Computing global histogram for R..." << '\n';
  127.  
  128. //Do prefix sum
  129. for (size_t j = 0; j < Config::NUM_WORKERS; j++)
  130. {
  131. for (int i=0, sum = 0;i<histSize;i++)
  132. {
  133. sum += hist_R[j][i];
  134. hist_R[j][i] = sum;
  135. }
  136. }
  137.  
  138. int *glb_hist_r = new int[histSize]();
  139.  
  140. for (size_t i = 0; i < histSize; i++)
  141. {
  142. for (size_t j = 0; j < Config::NUM_WORKERS; j++)
  143. {
  144. glb_hist_r[i] += hist_R[j][i];
  145. }
  146. // std::cout << "glb_hist_r: " << glb_hist_r[i] << '\n';
  147. }
  148.  
  149. // for (size_t i = 0; i < reordered_R->number_tuples; i++)
  150. // {
  151. // /* code */
  152. // std::cout << reordered_R->data[i].key << ',';
  153. // }
  154.  
  155. std::cout << "" << '\n';
  156.  
  157.  
  158. #endif
  159.  
  160. // S
  161. relation_t *reordered_S = new relation_t;
  162. reordered_S->data = new tuple_t[S_->number_tuples];
  163. reordered_S->number_tuples = S_->number_tuples;
  164. #ifdef SERIAL
  165. int *hist_S = partitioning(0, nbits, S_, reordered_S->data);
  166. #else
  167. int **hist_S = new int *[Config::NUM_WORKERS];
  168.  
  169. //pointer initialization
  170. for (int i = 0; i < Config::NUM_WORKERS; i++)
  171. {
  172. hist_S[i] = new int[histSize]();
  173. }
  174.  
  175. int tup_p_thread_s = S_->number_tuples / Config::NUM_WORKERS;
  176. std::vector<std::thread> threads_s;
  177. //ToDo call parallel part with assigned thread id
  178. // Split input partition an calculate private memory area
  179.  
  180. Barrier barrier_s(Config::NUM_WORKERS);
  181.  
  182. for (size_t i = 0; i < Config::NUM_WORKERS; i++)
  183. {
  184. /* code */
  185. threads_s.emplace_back(&RadixJoin::parallel_partitioning, this, i, nbits, S_, reordered_S->data, hist_S, tup_p_thread_s, histSize, std::ref(barrier_s));
  186. // threads_s[i].join();
  187. }
  188.  
  189. for(auto& th : threads_s)
  190. {
  191. /* code */
  192. th.join();
  193. }
  194.  
  195. std::cout << "Computing global histogram for S..." << '\n';
  196. int *glb_hist_s = new int[histSize]();
  197.  
  198. //Do prefix sum
  199. for (size_t j = 0; j < Config::NUM_WORKERS; j++)
  200. {
  201. for (int i=0, sum = 0;i<histSize;i++)
  202. {
  203. sum += hist_S[j][i];
  204. hist_S[j][i] = sum;
  205. }
  206. }
  207.  
  208. for (size_t i = 0; i < histSize; i++)
  209. {
  210. for (size_t j = 0; j < Config::NUM_WORKERS; j++)
  211. {
  212. glb_hist_s[i] += hist_S[j][i];
  213. }
  214. }
  215.  
  216. // for (size_t i = 0; i < reordered_S->number_tuples; i++)
  217. // {
  218. // /* code */
  219. // std::cout << reordered_S->data[i].key << ',';
  220. // }
  221.  
  222. // std::cout << "" << '\n';
  223. // for(auto& th : threads_s)
  224. // {
  225. // /* code */
  226. // th.join();
  227. // }
  228.  
  229. //ToDo call parallel part with assigned thread id
  230. // Split input partition an calculate private memory area
  231.  
  232. #endif
  233.  
  234. relation_t *partition_r = new relation_t;
  235. relation_t *partition_s = new relation_t;
  236.  
  237. std::vector<std::pair<uint32_t, uint32_t>> *result_ptr[npart];
  238. int result_size = 0;
  239. for (size_t i = 0; i < npart; i++)
  240. {
  241. #ifdef SERIAL
  242. int hist_r = (i == 0) ? 0 : hist_R[0][i - 1];
  243. partition_r->data = &reordered_R->data[hist_r];
  244. partition_r->number_tuples = hist_R[0][i] - hist_r;
  245.  
  246. int hist_s = (i == 0) ? 0 : hist_S[0][i - 1];
  247. partition_s->data = &reordered_S->data[hist_s];
  248. partition_s->number_tuples = hist_S[0][i] - hist_s;
  249. result_ptr[i] = build_probe(partition_r, partition_s, nbits);
  250. result_size += result_ptr[i]->size();
  251. #else
  252.  
  253. int hist_r = glb_hist_r[i];
  254. // std::cout << "hist_r[" << i << "] " << hist_r << '\n';
  255. partition_r->data = &reordered_R->data[hist_r];
  256. // std::cout << "glb_hist_r[" << i + 1 << "] " << glb_hist_r[i + 1] << '\n';
  257. partition_r->number_tuples = glb_hist_r[i + 1] - hist_r;
  258.  
  259. int hist_s = glb_hist_s[i];
  260. // std::cout << "hist_s[" << i << "] " << hist_s << '\n';
  261. partition_s->data = &reordered_S->data[hist_s];
  262. // std::cout << "hist_S[0][" << i + 1 << "] " << glb_hist_s[i + 1] << '\n';
  263. partition_s->number_tuples = glb_hist_s[i + 1] - hist_s;
  264.  
  265. result_ptr[i] = build_probe(partition_r, partition_s, nbits);
  266. //Do only build, extract histogram, mask, nbits
  267.  
  268. result_size += result_ptr[i]->size();
  269. #endif
  270. }
  271.  
  272. //After build,
  273.  
  274.  
  275. // std::cout << "Finished building and probing" << '\n';
  276.  
  277. result_relation_t *join_result = new result_relation_t;
  278. join_result->data = new std::vector<std::pair<uint32_t, uint32_t>>;
  279. join_result->data->resize(result_size);
  280. join_result->number_tuples = result_size;
  281.  
  282. int cnt = 0;
  283. for (size_t i = 0; i < npart; i++)
  284. {
  285. for (size_t j = 0; j < result_ptr[i]->size(); j++)
  286. {
  287. // std::cout << "result_ptr[i]" << result_ptr[i]->operator[](j).first << '\n';
  288. join_result->data->operator[](cnt) = result_ptr[i]->operator[](j);
  289. cnt++;
  290. }
  291. delete result_ptr[i];
  292. }
  293. std::cout << "count: " << cnt << '\n';
  294. delete[] glb_hist_r;
  295. delete[] glb_hist_s;
  296. delete[] hist_R;
  297. delete[] hist_S;
  298. return join_result;
  299. // return nullptr;
  300. }
  301.  
  302. std::vector<std::pair<uint32_t, uint32_t>> *RadixJoin::build_probe(relation_t *partition_r, relation_t *partition_s, int nbits)
  303. {
  304.  
  305. tuple_t *Rtuples = partition_r->data;
  306. const uint32_t numR = partition_r->number_tuples;
  307. uint32_t Nhist = get_hist_size(numR);
  308. const uint32_t MASK = (Nhist - 1) << nbits;
  309. std::vector<std::pair<uint32_t, uint32_t>> *return_ptr = new std::vector<std::pair<uint32_t, uint32_t>>;
  310. // return_ptr->resize(partition_s->number_tuples);
  311.  
  312. int32_t *hist = (int32_t *)calloc(Nhist + 2, sizeof(int32_t));
  313.  
  314. for (uint32_t i = 0; i < numR; i++)
  315. {
  316.  
  317. uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, nbits);
  318.  
  319. hist[idx + 2]++;
  320. }
  321.  
  322. /* prefix sum on histogram */
  323. for (uint32_t i = 2, sum = 0; i <= Nhist + 1; i++)
  324. {
  325. sum += hist[i];
  326. hist[i] = sum;
  327. }
  328.  
  329. tuple_t *const tmpRtuples = new tuple_t[partition_r->number_tuples];
  330. /* reorder tuples according to the prefix sum */
  331. for (uint32_t i = 0; i < numR; i++)
  332. {
  333.  
  334. uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, nbits) + 1;
  335.  
  336. tmpRtuples[hist[idx]] = Rtuples[i];
  337.  
  338. hist[idx]++;
  339. }
  340.  
  341.  
  342.  
  343.  
  344. //Parallel probing
  345. uint32_t nrThreads = Config::NUM_WORKERS; // TODO: decrease nrThreads if there is not a lot of tuples in S
  346. std::vector<std::thread> threads;
  347.  
  348. std::vector<std::pair<uint32_t, uint32_t>> *div_result_ptrs[nrThreads];
  349. const uint32_t numS = partition_s->number_tuples;
  350. tuple_t * Stuples = partition_s->data;
  351. int tuple_count_s;
  352. // std::cout << "S tuples: " << numS << '\n';
  353. int tuple_sum;
  354. for(size_t i = 0; i < nrThreads; i++)
  355. {
  356. if (i == nrThreads - 1)
  357. tuple_count_s = numS - i * (numS/nrThreads);
  358. else
  359. tuple_count_s = numS/nrThreads;
  360. tuple_sum += tuple_count_s;
  361. div_result_ptrs[i] = new std::vector<std::pair<uint32_t, uint32_t>>;
  362. // std::cout << "thread: " << i << " offset: " << i*tuple_count_s << '\n';
  363. threads.emplace_back(&RadixJoin::parallel_probe, this, tmpRtuples, Stuples+i*tuple_count_s, tuple_count_s, hist, MASK, nbits, div_result_ptrs[i]);
  364. }
  365. assert(tuple_sum == numS);
  366. for(auto& th : threads)
  367. {
  368. th.join();
  369. }
  370.  
  371. // std::cout << "Joined threads" << std::endl;
  372. int cnt = 0;
  373. for (size_t i = 0; i < nrThreads; i++)
  374. {
  375. // std::cout << "div_result_ptrs size: " << div_result_ptrs[i]->size() << '\n';
  376. for (size_t j = 0; j < div_result_ptrs[i]->size(); j++)
  377. {
  378. // std::cout << "div_result_ptrs[i]" << div_result_ptrs[i]->operator[](j).first << '\n';
  379. return_ptr->push_back(div_result_ptrs[i]->operator[](j));
  380. cnt++;
  381. }
  382. delete div_result_ptrs[i];
  383. }
  384. delete[] tmpRtuples;
  385. return return_ptr;
  386. }
  387.  
  388. void RadixJoin::parallel_probe(tuple_t * tuples_r, tuple_t * tuples_s, int tuple_count_s, int32_t *hist, uint32_t MASK, int nbits, std::vector<std::pair<uint32_t, uint32_t>> *return_ptr)
  389. {
  390. // int64_t match = 0;
  391. // std::cout << "tuple_count: " << tuple_count_s << '\n';
  392. // std::cout << "first rid in S: " << tuples_s[0].rid << '\n';
  393.  
  394. for (uint32_t i = 0; i < tuple_count_s; i++)
  395. {
  396. uint32_t idx = HASH_BIT_MODULO(tuples_s[i].key, MASK, nbits);
  397.  
  398. int j = hist[idx], end = hist[idx + 1];
  399. // std::cout << "tuples_s[i].key: " << tuples_s[i].key << '\n';
  400. /* Scalar comparisons */
  401. for (; j < end; j++)
  402. {
  403. // std::cout << "idx: " << j << " end: " << end << " tuples_r[j].key: " << tuples_r[j].key << " tuples_s[i].key: " << tuples_s[i].key << '\n';
  404. if (tuples_r[j].key == tuples_s[i].key)
  405. {
  406. // std::cout << "RID " << tuples_r[i].rid << " " << tuples_s[j].rid << '\n';
  407. assert(tuples_r[j].key != tuples_s[i].key);
  408. return_ptr->push_back(std::make_pair(tuples_r[j].rid, tuples_s[i].rid));
  409. }
  410. }
  411. }
  412. }
  413.  
  414.  
  415. //
  416. // ToDo Pass multidimensional array for saving histogram hist[num_threads][hist_size]
  417. //
  418. //
  419. bool RadixJoin::parallel_partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp, int **hist, int tuples_per_thread, int histSize, Barrier& barrier)
  420. {
  421.  
  422. // std::cout << "First phase" << '\n';
  423.  
  424. int offset = tuples_per_thread * tid;
  425.  
  426. //Check if thread is the last
  427. if (tid == Config::NUM_WORKERS - 1)
  428. tuples_per_thread = rel->number_tuples - offset;
  429.  
  430. // std::cout << "Threadid: " << tid << " #tuples: " << tuples_per_thread << " offset: " << offset << std::endl;
  431.  
  432. for (size_t i = offset; i < tuples_per_thread + offset; i++)
  433. {
  434. // int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
  435. int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
  436. hist[tid][idx]++;
  437. }
  438.  
  439.  
  440. // // Step P3: Reorder the tuples of the table by iterating over them,
  441. // // and scattering a tuple to the address stored at the corresponding hash location.
  442. // for (size_t i = offset; i < tuples_per_thread + offset; i++)
  443. // {
  444. // int idx = PART_HASH(rel->data[i].key, nbits);
  445. // tmp[hist[tid][idx]] = rel->data[i];
  446. // // std::cout << "map: " << rel->data[i].key << " to index: " << hist[idx] << " idx: " << idx << std::endl;
  447. // ++hist[tid][idx];
  448. // }
  449.  
  450. // std::cout << "waiting for barrier" << std::endl;
  451. barrier.Wait();
  452.  
  453. int *output = new int[histSize]();
  454. bool breakFlag = false;
  455. for (size_t j = 1; j < histSize; j++)
  456. {
  457. int sum = 0;
  458. for (size_t k = 1; k <= j; k++)
  459. {
  460. for (size_t i = 0; i < Config::NUM_WORKERS; i++)
  461. {
  462. if (k == j && i == tid)
  463. break;
  464. // std::cout << "hist[i][k] -> " << hist[i][k] << " i: " << i << " k: " << k << " tid: " << tid << std::endl;
  465. sum += hist[i][k];
  466. }
  467. }
  468. output[j-1] = sum;
  469. }
  470.  
  471.  
  472.  
  473. // for(size_t i = 0; i <= tid; i++)
  474. // {
  475. // for(size_t j = 0; j < histSize; j++)
  476. // {
  477. // output[j] += hist[i][j];
  478. // }
  479. // }
  480.  
  481. // for(int i = tid; i < Config::NUM_WORKERS; i++) {
  482. // for(int j = 1; j < histSize; j++)
  483. // output[j] += hist[i][j-1];
  484. // }
  485.  
  486. // for (size_t j = 0; j < histSize; j++)
  487. // {
  488. // std::cout << "hist[j] " << hist[tid][j] << '\n';
  489. // std::cout << "output[j] " << output[j] << '\n';
  490. // }
  491.  
  492. for (size_t i = offset; i < tuples_per_thread + offset; i++)
  493. {
  494. int idx = PART_HASH(rel->data[i].key, nbits);
  495.  
  496. tmp[output[idx]] = rel->data[i];
  497. // std::cout << "map: " << rel->data[i].key << " to index: " << output[idx] << " idx: " << idx << std::endl;
  498. ++output[idx];
  499. }
  500.  
  501. // Build Global Histogram
  502. std::cout << "Returning from parallel partitioning, threadid: " << tid << '\n';
  503. delete[] output;
  504. return true;
  505. }
  506.  
  507. int *RadixJoin::partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp)
  508. {
  509. // Build Local Histogram
  510. std::cout << "First phase" << '\n';
  511.  
  512. // Step P1: Iterate over the tuples of the table and build a histogram (Hmist),
  513. // with the j'th entry storing the number of input keys that hash to index j.
  514.  
  515. //We have 2^nbits partitions, where the partitions can each fit into the cache.
  516. int histSize = pow(2, nbits) + 1;
  517. std::cout << "HISTSIZE" << histSize << '\n';
  518. int *hist = new int[histSize]();
  519.  
  520. for (size_t i = 0; i < rel->number_tuples; i++)
  521. {
  522. int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
  523. hist[idx]++;
  524. }
  525.  
  526. // Step P2: Perform the prefix sum of the histogram (Hist) to compute the starting addresses
  527. // of the elements mapping to the respective indices of the histogram.
  528. for (int i = 0, sum = 0; i < histSize; i++)
  529. {
  530. sum += hist[i];
  531. hist[i] = sum;
  532. // std::cout << hist[i] << std::endl;
  533. }
  534.  
  535. // Step P3: Reorder the tuples of the table by iterating over them,
  536. // and scattering a tuple to the address stored at the corresponding hash location.
  537. for (size_t i = 0; i < rel->number_tuples; i++)
  538. {
  539. int idx = PART_HASH(rel->data[i].key, nbits);
  540. tmp[hist[idx]] = rel->data[i];
  541. // std::cout << "map: " << rel->data[i].key << " to index: " << hist[idx] << " idx: " << idx << std::endl;
  542. ++hist[idx];
  543. }
  544. // Build Global Histogram
  545. std::cout << "second phase" << '\n';
  546.  
  547. return hist;
  548. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement