Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "../include/RadixJoin.h"
- #include <iostream>
- #include <chrono>
- #include <math.h>
- #include <cassert>
- RadixJoin::RadixJoin(relation_t *R, relation_t *S)
- {
- if (R->number_tuples <= S->number_tuples)
- {
- R_ = R;
- S_ = S;
- }
- else
- {
- R_ = S;
- S_ = R;
- }
- //
- }
- RadixJoin::~RadixJoin()
- {
- }
- /*
- *RADIX JOIN - Implement Radix join with the following requirements
- * 1. Multithreaded
- * 2. Use Histogramm to access memory offsets
- * 3. Use Radix Partitioning to create chunks fitting into the cache, but only one pass needed, i.e., figure out how
- * many bits you need to create N partitions
- *
- * Input: Use Member Variables R and S
- *
- * Return: result_relation_t - defined in Types.h
- */
- // // create Worker Threads
- // std::thread t1(&RadixJoin::parallel_partitioning,this);
- // std::this_thread::sleep_for(std::chrono::seconds(5));
- // std::thread t2(&RadixJoin::parallel_partitioning,this);
- // t1.join();
- // t2.join();
- result_relation_t *RadixJoin::nlj()
- {
- //
- result_relation_t *result_rel = new result_relation_t;
- std::vector<std::pair<uint32_t, uint32_t>> *result_data = new std::vector<std::pair<uint32_t, uint32_t>>;
- result_data->resize(S_->number_tuples);
- int cnt = 0;
- for (size_t i = 0; i < S_->number_tuples; i++)
- {
- for (size_t j = 0; j < R_->number_tuples; j++)
- {
- if (R_->data[j].key == S_->data[i].key)
- {
- result_data->operator[](cnt) = std::make_pair(R_->data[j].rid, S_->data[i].rid);
- cnt++;
- }
- }
- }
- //
- result_rel->data = result_data;
- result_rel->number_tuples = result_data->size();
- return result_rel;
- }
- // #define SERIAL
- result_relation_t *RadixJoin::join()
- {
- // parallel partition R
- int npart = ((R_->number_tuples * sizeof(tuple_t)) / Config::CACHE_SIZE) + 1;
- // int npart = 4;
- NEXT_POW_2(npart);
- std::cout << "npart: " << npart << '\n';
- int nbits = log2(npart);
- std::cout << "Number of bits needed: " << nbits << '\n';
- // malloc tmp output relation
- // R
- relation_t *reordered_R = new relation_t;
- reordered_R->data = new tuple_t[R_->number_tuples];
- reordered_R->number_tuples = R_->number_tuples;
- #ifdef SERIAL
- int *hist_R = partitioning(0, nbits, R_, reordered_R->data);
- #else
- int histSize = pow(2, nbits) + 1;
- std::cout << "HISTSIZE" << histSize << '\n';
- // int** hist = new int[histSize]();
- //pointer to 2D array
- int **hist_R = new int *[Config::NUM_WORKERS];
- //pointer initialization
- for (int i = 0; i < Config::NUM_WORKERS; i++)
- {
- hist_R[i] = new int[histSize]();
- }
- int tup_p_thread = R_->number_tuples / Config::NUM_WORKERS;
- std::vector<std::thread> threads;
- //ToDo call parallel part with assigned thread id
- // Split input partition an calculate private memory area
- Barrier barrier_r(Config::NUM_WORKERS);
- for (size_t i = 0; i < Config::NUM_WORKERS; i++)
- {
- /* code */
- threads.emplace_back(&RadixJoin::parallel_partitioning, this, i, nbits, R_, reordered_R->data, hist_R, tup_p_thread, histSize, std::ref(barrier_r));
- // threads[i].join();
- }
- for(auto& th : threads)
- {
- /* code */
- th.join();
- }
- std::cout << "Computing global histogram for R..." << '\n';
- //Do prefix sum
- for (size_t j = 0; j < Config::NUM_WORKERS; j++)
- {
- for (int i=0, sum = 0;i<histSize;i++)
- {
- sum += hist_R[j][i];
- hist_R[j][i] = sum;
- }
- }
- int *glb_hist_r = new int[histSize]();
- for (size_t i = 0; i < histSize; i++)
- {
- for (size_t j = 0; j < Config::NUM_WORKERS; j++)
- {
- glb_hist_r[i] += hist_R[j][i];
- }
- // std::cout << "glb_hist_r: " << glb_hist_r[i] << '\n';
- }
- // for (size_t i = 0; i < reordered_R->number_tuples; i++)
- // {
- // /* code */
- // std::cout << reordered_R->data[i].key << ',';
- // }
- std::cout << "" << '\n';
- #endif
- // S
- relation_t *reordered_S = new relation_t;
- reordered_S->data = new tuple_t[S_->number_tuples];
- reordered_S->number_tuples = S_->number_tuples;
- #ifdef SERIAL
- int *hist_S = partitioning(0, nbits, S_, reordered_S->data);
- #else
- int **hist_S = new int *[Config::NUM_WORKERS];
- //pointer initialization
- for (int i = 0; i < Config::NUM_WORKERS; i++)
- {
- hist_S[i] = new int[histSize]();
- }
- int tup_p_thread_s = S_->number_tuples / Config::NUM_WORKERS;
- std::vector<std::thread> threads_s;
- //ToDo call parallel part with assigned thread id
- // Split input partition an calculate private memory area
- Barrier barrier_s(Config::NUM_WORKERS);
- for (size_t i = 0; i < Config::NUM_WORKERS; i++)
- {
- /* code */
- threads_s.emplace_back(&RadixJoin::parallel_partitioning, this, i, nbits, S_, reordered_S->data, hist_S, tup_p_thread_s, histSize, std::ref(barrier_s));
- // threads_s[i].join();
- }
- for(auto& th : threads_s)
- {
- /* code */
- th.join();
- }
- std::cout << "Computing global histogram for S..." << '\n';
- int *glb_hist_s = new int[histSize]();
- //Do prefix sum
- for (size_t j = 0; j < Config::NUM_WORKERS; j++)
- {
- for (int i=0, sum = 0;i<histSize;i++)
- {
- sum += hist_S[j][i];
- hist_S[j][i] = sum;
- }
- }
- for (size_t i = 0; i < histSize; i++)
- {
- for (size_t j = 0; j < Config::NUM_WORKERS; j++)
- {
- glb_hist_s[i] += hist_S[j][i];
- }
- }
- // for (size_t i = 0; i < reordered_S->number_tuples; i++)
- // {
- // /* code */
- // std::cout << reordered_S->data[i].key << ',';
- // }
- // std::cout << "" << '\n';
- // for(auto& th : threads_s)
- // {
- // /* code */
- // th.join();
- // }
- //ToDo call parallel part with assigned thread id
- // Split input partition an calculate private memory area
- #endif
- relation_t *partition_r = new relation_t;
- relation_t *partition_s = new relation_t;
- std::vector<std::pair<uint32_t, uint32_t>> *result_ptr[npart];
- int result_size = 0;
- for (size_t i = 0; i < npart; i++)
- {
- #ifdef SERIAL
- int hist_r = (i == 0) ? 0 : hist_R[0][i - 1];
- partition_r->data = &reordered_R->data[hist_r];
- partition_r->number_tuples = hist_R[0][i] - hist_r;
- int hist_s = (i == 0) ? 0 : hist_S[0][i - 1];
- partition_s->data = &reordered_S->data[hist_s];
- partition_s->number_tuples = hist_S[0][i] - hist_s;
- result_ptr[i] = build_probe(partition_r, partition_s, nbits);
- result_size += result_ptr[i]->size();
- #else
- int hist_r = glb_hist_r[i];
- // std::cout << "hist_r[" << i << "] " << hist_r << '\n';
- partition_r->data = &reordered_R->data[hist_r];
- // std::cout << "glb_hist_r[" << i + 1 << "] " << glb_hist_r[i + 1] << '\n';
- partition_r->number_tuples = glb_hist_r[i + 1] - hist_r;
- int hist_s = glb_hist_s[i];
- // std::cout << "hist_s[" << i << "] " << hist_s << '\n';
- partition_s->data = &reordered_S->data[hist_s];
- // std::cout << "hist_S[0][" << i + 1 << "] " << glb_hist_s[i + 1] << '\n';
- partition_s->number_tuples = glb_hist_s[i + 1] - hist_s;
- result_ptr[i] = build_probe(partition_r, partition_s, nbits);
- //Do only build, extract histogram, mask, nbits
- result_size += result_ptr[i]->size();
- #endif
- }
- //After build,
- // std::cout << "Finished building and probing" << '\n';
- result_relation_t *join_result = new result_relation_t;
- join_result->data = new std::vector<std::pair<uint32_t, uint32_t>>;
- join_result->data->resize(result_size);
- join_result->number_tuples = result_size;
- int cnt = 0;
- for (size_t i = 0; i < npart; i++)
- {
- for (size_t j = 0; j < result_ptr[i]->size(); j++)
- {
- // std::cout << "result_ptr[i]" << result_ptr[i]->operator[](j).first << '\n';
- join_result->data->operator[](cnt) = result_ptr[i]->operator[](j);
- cnt++;
- }
- delete result_ptr[i];
- }
- std::cout << "count: " << cnt << '\n';
- delete[] glb_hist_r;
- delete[] glb_hist_s;
- delete[] hist_R;
- delete[] hist_S;
- return join_result;
- // return nullptr;
- }
- std::vector<std::pair<uint32_t, uint32_t>> *RadixJoin::build_probe(relation_t *partition_r, relation_t *partition_s, int nbits)
- {
- tuple_t *Rtuples = partition_r->data;
- const uint32_t numR = partition_r->number_tuples;
- uint32_t Nhist = get_hist_size(numR);
- const uint32_t MASK = (Nhist - 1) << nbits;
- std::vector<std::pair<uint32_t, uint32_t>> *return_ptr = new std::vector<std::pair<uint32_t, uint32_t>>;
- // return_ptr->resize(partition_s->number_tuples);
- int32_t *hist = (int32_t *)calloc(Nhist + 2, sizeof(int32_t));
- for (uint32_t i = 0; i < numR; i++)
- {
- uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, nbits);
- hist[idx + 2]++;
- }
- /* prefix sum on histogram */
- for (uint32_t i = 2, sum = 0; i <= Nhist + 1; i++)
- {
- sum += hist[i];
- hist[i] = sum;
- }
- tuple_t *const tmpRtuples = new tuple_t[partition_r->number_tuples];
- /* reorder tuples according to the prefix sum */
- for (uint32_t i = 0; i < numR; i++)
- {
- uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, nbits) + 1;
- tmpRtuples[hist[idx]] = Rtuples[i];
- hist[idx]++;
- }
- //Parallel probing
- uint32_t nrThreads = Config::NUM_WORKERS; // TODO: decrease nrThreads if there is not a lot of tuples in S
- std::vector<std::thread> threads;
- std::vector<std::pair<uint32_t, uint32_t>> *div_result_ptrs[nrThreads];
- const uint32_t numS = partition_s->number_tuples;
- tuple_t * Stuples = partition_s->data;
- int tuple_count_s;
- // std::cout << "S tuples: " << numS << '\n';
- int tuple_sum;
- for(size_t i = 0; i < nrThreads; i++)
- {
- if (i == nrThreads - 1)
- tuple_count_s = numS - i * (numS/nrThreads);
- else
- tuple_count_s = numS/nrThreads;
- tuple_sum += tuple_count_s;
- div_result_ptrs[i] = new std::vector<std::pair<uint32_t, uint32_t>>;
- // std::cout << "thread: " << i << " offset: " << i*tuple_count_s << '\n';
- threads.emplace_back(&RadixJoin::parallel_probe, this, tmpRtuples, Stuples+i*tuple_count_s, tuple_count_s, hist, MASK, nbits, div_result_ptrs[i]);
- }
- assert(tuple_sum == numS);
- for(auto& th : threads)
- {
- th.join();
- }
- // std::cout << "Joined threads" << std::endl;
- int cnt = 0;
- for (size_t i = 0; i < nrThreads; i++)
- {
- // std::cout << "div_result_ptrs size: " << div_result_ptrs[i]->size() << '\n';
- for (size_t j = 0; j < div_result_ptrs[i]->size(); j++)
- {
- // std::cout << "div_result_ptrs[i]" << div_result_ptrs[i]->operator[](j).first << '\n';
- return_ptr->push_back(div_result_ptrs[i]->operator[](j));
- cnt++;
- }
- delete div_result_ptrs[i];
- }
- delete[] tmpRtuples;
- return return_ptr;
- }
- void RadixJoin::parallel_probe(tuple_t * tuples_r, tuple_t * tuples_s, int tuple_count_s, int32_t *hist, uint32_t MASK, int nbits, std::vector<std::pair<uint32_t, uint32_t>> *return_ptr)
- {
- // int64_t match = 0;
- // std::cout << "tuple_count: " << tuple_count_s << '\n';
- // std::cout << "first rid in S: " << tuples_s[0].rid << '\n';
- for (uint32_t i = 0; i < tuple_count_s; i++)
- {
- uint32_t idx = HASH_BIT_MODULO(tuples_s[i].key, MASK, nbits);
- int j = hist[idx], end = hist[idx + 1];
- // std::cout << "tuples_s[i].key: " << tuples_s[i].key << '\n';
- /* Scalar comparisons */
- for (; j < end; j++)
- {
- // std::cout << "idx: " << j << " end: " << end << " tuples_r[j].key: " << tuples_r[j].key << " tuples_s[i].key: " << tuples_s[i].key << '\n';
- if (tuples_r[j].key == tuples_s[i].key)
- {
- // std::cout << "RID " << tuples_r[i].rid << " " << tuples_s[j].rid << '\n';
- assert(tuples_r[j].key != tuples_s[i].key);
- return_ptr->push_back(std::make_pair(tuples_r[j].rid, tuples_s[i].rid));
- }
- }
- }
- }
- //
- // ToDo Pass multidimensional array for saving histogram hist[num_threads][hist_size]
- //
- //
- bool RadixJoin::parallel_partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp, int **hist, int tuples_per_thread, int histSize, Barrier& barrier)
- {
- // std::cout << "First phase" << '\n';
- int offset = tuples_per_thread * tid;
- //Check if thread is the last
- if (tid == Config::NUM_WORKERS - 1)
- tuples_per_thread = rel->number_tuples - offset;
- // std::cout << "Threadid: " << tid << " #tuples: " << tuples_per_thread << " offset: " << offset << std::endl;
- for (size_t i = offset; i < tuples_per_thread + offset; i++)
- {
- // int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
- int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
- hist[tid][idx]++;
- }
- // // Step P3: Reorder the tuples of the table by iterating over them,
- // // and scattering a tuple to the address stored at the corresponding hash location.
- // for (size_t i = offset; i < tuples_per_thread + offset; i++)
- // {
- // int idx = PART_HASH(rel->data[i].key, nbits);
- // tmp[hist[tid][idx]] = rel->data[i];
- // // std::cout << "map: " << rel->data[i].key << " to index: " << hist[idx] << " idx: " << idx << std::endl;
- // ++hist[tid][idx];
- // }
- // std::cout << "waiting for barrier" << std::endl;
- barrier.Wait();
- int *output = new int[histSize]();
- bool breakFlag = false;
- for (size_t j = 1; j < histSize; j++)
- {
- int sum = 0;
- for (size_t k = 1; k <= j; k++)
- {
- for (size_t i = 0; i < Config::NUM_WORKERS; i++)
- {
- if (k == j && i == tid)
- break;
- // std::cout << "hist[i][k] -> " << hist[i][k] << " i: " << i << " k: " << k << " tid: " << tid << std::endl;
- sum += hist[i][k];
- }
- }
- output[j-1] = sum;
- }
- // for(size_t i = 0; i <= tid; i++)
- // {
- // for(size_t j = 0; j < histSize; j++)
- // {
- // output[j] += hist[i][j];
- // }
- // }
- // for(int i = tid; i < Config::NUM_WORKERS; i++) {
- // for(int j = 1; j < histSize; j++)
- // output[j] += hist[i][j-1];
- // }
- // for (size_t j = 0; j < histSize; j++)
- // {
- // std::cout << "hist[j] " << hist[tid][j] << '\n';
- // std::cout << "output[j] " << output[j] << '\n';
- // }
- for (size_t i = offset; i < tuples_per_thread + offset; i++)
- {
- int idx = PART_HASH(rel->data[i].key, nbits);
- tmp[output[idx]] = rel->data[i];
- // std::cout << "map: " << rel->data[i].key << " to index: " << output[idx] << " idx: " << idx << std::endl;
- ++output[idx];
- }
- // Build Global Histogram
- std::cout << "Returning from parallel partitioning, threadid: " << tid << '\n';
- delete[] output;
- return true;
- }
- int *RadixJoin::partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp)
- {
- // Build Local Histogram
- std::cout << "First phase" << '\n';
- // Step P1: Iterate over the tuples of the table and build a histogram (Hmist),
- // with the j'th entry storing the number of input keys that hash to index j.
- //We have 2^nbits partitions, where the partitions can each fit into the cache.
- int histSize = pow(2, nbits) + 1;
- std::cout << "HISTSIZE" << histSize << '\n';
- int *hist = new int[histSize]();
- for (size_t i = 0; i < rel->number_tuples; i++)
- {
- int idx = PART_HASH(rel->data[i].key, nbits) + 1; //+1 because we want to store a 0 as the first element
- hist[idx]++;
- }
- // Step P2: Perform the prefix sum of the histogram (Hist) to compute the starting addresses
- // of the elements mapping to the respective indices of the histogram.
- for (int i = 0, sum = 0; i < histSize; i++)
- {
- sum += hist[i];
- hist[i] = sum;
- // std::cout << hist[i] << std::endl;
- }
- // Step P3: Reorder the tuples of the table by iterating over them,
- // and scattering a tuple to the address stored at the corresponding hash location.
- for (size_t i = 0; i < rel->number_tuples; i++)
- {
- int idx = PART_HASH(rel->data[i].key, nbits);
- tmp[hist[idx]] = rel->data[i];
- // std::cout << "map: " << rel->data[i].key << " to index: " << hist[idx] << " idx: " << idx << std::endl;
- ++hist[idx];
- }
- // Build Global Histogram
- std::cout << "second phase" << '\n';
- return hist;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement