Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include <atomic>
- #include <cassert>
- #include <cstddef>
- #include <iostream>
- #include <limits>
- #include <memory>
- #include <thread>
- #include <vector>
- // we are always interested in the smallest bounds
- #define MAX std::numeric_limits<uint64_t>::max()
- #define CACHE_LINE_SIZE 64 // this is not the point
- template <class T> class ConcurrentQueue {
- public:
- ConcurrentQueue(std::size_t capacity, size_t num_threads,
- std::size_t elements_to_process = MAX)
- : modMask(capacity), local_bounds(num_threads),
- elements_to_process(elements_to_process) {
- // ensure that the types are implemented without locks on this cpu
- assert(std::atomic_is_lock_free(&lead_back));
- assert(std::atomic_is_lock_free(&lead_front));
- // round capacity to closest power of two
- modMask |= modMask >> 1;
- modMask |= modMask >> 2;
- modMask |= modMask >> 4;
- modMask |= modMask >> 8;
- modMask |= modMask >> 16;
- modMask |= modMask >> 32;
- this->capacity = modMask + 1;
- array = std::make_unique<T[]>(this->capacity);
- // local_bounds.resize(num_threads);
- }
- // fully concurrent
- void push_back(uint id, const T &e) {
- // thread *id* reserved this space
- // only the thread itself ever writes to his local bounds
- local_bounds[id].back = lead_back; // only r-side is atomic (this is subtle)
- // asm volatile("mfence" ::: "memory"); // mfence? the involvement of atomic
- // should construct a mfence anyway
- local_bounds[id].back = lead_back++; // atomic
- // thread can't insert, follow_fornt doesn't mark the entry as save
- // two reasons: 1) queue is actually full -> not much to do
- // 2) follow_front is not current -> use this thread to update
- while (local_bounds[id].back >= capacity + follow_front) { // wrapped around
- // case 1) yield so consumer can work
- // case 2) and threads with smaller local back that can still do work
- std::this_thread::yield();
- // update follow front to smalles local front
- // only reads on local only one write on follow front
- uint64_t min = lead_front;
- for (size_t i = 0; i < local_bounds.size(); ++i) {
- // need copy
- uint64_t local_front = local_bounds[i].front;
- // asm volatile("mfence" ::: "memory"); // could increase performance
- // but it is unlikely and correctness is guaranteed anyway
- if (local_front < min) {
- min = local_front;
- }
- }
- follow_front = min;
- }
- array[local_bounds[id].back & modMask] = e;
- // free this index
- local_bounds[id].back = MAX;
- }
- // fully concurrent
- T pop_front(uint id) {
- local_bounds[id].front = lead_front;
- local_bounds[id].front = lead_front++;
- while (local_bounds[id].front >= follow_back) {
- if (local_bounds[id].front >= elements_to_process) {
- local_bounds[id].front = MAX;
- return EOW;
- }
- std::this_thread::yield();
- uint64_t min = lead_back;
- for (size_t i = 0; i < local_bounds.size(); ++i) {
- uint64_t local_back = local_bounds[i].back;
- // asm volatile("mfence" ::: "memory");
- if (local_back < min) {
- min = local_back;
- }
- }
- follow_back = min;
- }
- T result = array[local_bounds[id].front & modMask];
- local_bounds[id].front = MAX;
- return result;
- }
- size_t size_upper_bound() {
- uint64_t l_lead_back = lead_back;
- uint64_t l_follow_front = follow_front;
- return l_lead_back - l_follow_front;
- }
- size_t size_lower_bound() {
- uint64_t l_follow_back = follow_back;
- uint64_t l_lead_front = lead_front;
- return l_follow_back - l_lead_front;
- }
- size_t get_work_done() { return lead_front; }
- private:
- // T does not need to be moveable
- std::unique_ptr<T[]> array;
- // T() could be more expensive to build than to copy
- const T EOW = T();
- // padding? might be overkill
- // only one thread writes all other only (rarely) read, so with padding those
- // cachelines are never invalidated, but it could be costly for a high amount
- // of threads since iterating over it will be inefficient -> Test
- struct alignas(CACHE_LINE_SIZE) bound {
- // initially the bounds are disabled
- size_t back = MAX, front = MAX;
- };
- uint64_t capacity; // power of 2
- uint64_t modMask; // used for index
- // everything used as an index for array will grow monotonically
- // on 8 3.4 GHz cores executing one push and pop every cycle this could be a
- // problem in about 21 years
- // Proof: (/ 18446744073709551615 (* 3.4 8 (expt 10 9)) 60 60 24 365)
- // MAX G m h d y
- std::vector<bound> local_bounds;
- // atomics generate "lock <x86 instruction i.e. add>" which causes the CPU to
- // lock a cacheline. If both leads are on one cacheline producers and
- // consumers would interfere unnecessarily, the follows have less potential
- // for speed reduction
- alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_back = 0;
- alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_front = 0;
- alignas(CACHE_LINE_SIZE) uint64_t follow_back = 0;
- alignas(CACHE_LINE_SIZE) uint64_t follow_front = 0;
- const uint64_t elements_to_process;
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement