Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include "Types.h"
- #include "Config.h"
- #include <iostream>
- #include <thread>
- #include <atomic>
- #include <utility>
- #include <mutex>
- #include <condition_variable>
- #ifndef BARRIER_SYNC
- /** barrier wait macro */
- #define BARRIER_SYNC() \
- sync_barrier++; \
- while (sync_barrier != Config::NUM_WORKERS) \
- { \
- }; \
- sync_barrier--;
- #endif
- //https://stackoverflow.com/questions/24465533/implementing-boostbarrier-in-c11
- class Barrier {
- public:
- explicit Barrier(std::size_t iCount) :
- mThreshold(iCount),
- mCount(iCount),
- mGeneration(0) {
- }
- void Wait() {
- std::unique_lock<std::mutex> lLock{mMutex};
- auto lGen = mGeneration;
- if (!--mCount) {
- mGeneration++;
- mCount = mThreshold;
- mCond.notify_all();
- } else {
- mCond.wait(lLock, [this, lGen] { return lGen != mGeneration; });
- }
- }
- private:
- std::mutex mMutex;
- std::condition_variable mCond;
- std::size_t mThreshold;
- std::size_t mCount;
- std::size_t mGeneration;
- };
- class RadixJoin
- {
- public:
- RadixJoin(relation_t *R, relation_t *S);
- RadixJoin(RadixJoin &&) = default;
- RadixJoin(const RadixJoin &) = default;
- RadixJoin &operator=(RadixJoin &&) = default;
- RadixJoin &operator=(const RadixJoin &) = default;
- ~RadixJoin();
- result_relation_t *join();
- result_relation_t *nlj();
- private:
- inline uint32_t get_hist_size(uint32_t relSize)
- {
- NEXT_POW_2(relSize);
- relSize >>= 2;
- if (relSize < 4)
- relSize = 4;
- return relSize;
- }
- void parallel_probe(tuple_t * tuples_r, tuple_t * tuples_s, int tuple_count_s, int32_t *hist, uint32_t MASK, int nbits, std::vector<std::pair<uint32_t, uint32_t>> *return_ptr);
- bool parallel_partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp, int **hist, int tuples_per_thread, int histSize, Barrier& barier);
- int* partitioning(int tid, int nbits, relation_t *rel, tuple_t *tmp);
- std::vector<std::pair<uint32_t,uint32_t>>* build_probe(relation_t* partition_r, relation_t* partition_s, int nbits);
- relation_t *R_ = nullptr;
- relation_t *S_ = nullptr;
- std::atomic<int> sync_barrier{0};
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement