Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include <atomic>
- #include <cassert>
- #include <cstddef>
- #include <iostream>
- #include <limits>
- #include <memory>
- #include <thread>
- #include <vector>
- // we are always interested in the smallest bounds
- #define MAX std::numeric_limits<uint64_t>::max()
- #define CACHE_LINE_SIZE 64 // this is not the point
- template <class T> class ConcurrentQueue {
- public:
- ConcurrentQueue(std::size_t capacity, size_t num_threads) {
- // ensure that the types are implemented without locks on this cpu
- assert(std::atomic_is_lock_free(&lead_back));
- assert(std::atomic_is_lock_free(&lead_front));
- // round capacity to closest power of two
- modMask = capacity;
- modMask |= modMask >> 1;
- modMask |= modMask >> 2;
- modMask |= modMask >> 4;
- modMask |= modMask >> 8;
- modMask |= modMask >> 16;
- modMask |= modMask >> 32;
- this->capacity = modMask + 1;
- // array = std::make_unique<T[]>(capacity, T());
- array = std::make_unique<T[]>(this->capacity);
- local_bounds.resize(num_threads);
- }
- // fully concurrent
- void push_back(uint id, const T &e) {
- // thread *id* reserved this space
- // only the thread itself ever writes to his local bounds
- local_bounds[id].back = lead_back; // only r-side is atomic (this is subtle)
- asm volatile("mfence" ::: "memory"); // mfence?
- local_bounds[id].back = lead_back++; // atomic
- // thread can't insert, follow_fornt doesn't mark the entry as save
- // two reasons: 1) queue is actually full -> not much to do
- // 2) follow_front is not current -> use this thread to update
- while (local_bounds[id].back >= capacity + follow_front) { // wrapped around
- // case 1) yield so consumer can work
- // case 2) and threads with smaller local back that can still do work
- std::this_thread::yield();
- // update follow front to smalles local front
- // only reads on local only one write on follow front
- uint64_t min = lead_front;
- for (size_t i = 0; i < local_bounds.size(); ++i) {
- // need copy
- uint64_t local_front = local_bounds[i].front;
- asm volatile("mfence" ::: "memory");
- if (local_front < min) {
- min = local_front;
- }
- }
- follow_front = min;
- }
- array[local_bounds[id].back & modMask] = e;
- // free this index
- local_bounds[id].back = MAX;
- }
- // fully concurrent
- T pop_front(uint id) {
- local_bounds[id].front = lead_front; // not totaly sure if needed
- local_bounds[id].front = lead_front++;
- while (local_bounds[id].front >= follow_back) {
- std::this_thread::yield();
- uint64_t min = lead_back;
- for (size_t i = 0; i < local_bounds.size(); ++i) {
- uint64_t local_back = local_bounds[i].back;
- asm volatile("mfence" ::: "memory");
- if (local_back < min) {
- min = local_back;
- }
- }
- follow_back = min;
- }
- T result = array[local_bounds[id].front & modMask];
- local_bounds[id].front = MAX;
- return result;
- }
- private:
- std::unique_ptr<T[]> array;
- struct bound {
- // initially the bounds are disabled
- size_t back = MAX, front = MAX;
- };
- uint64_t capacity; // power of 2
- uint64_t modMask; // used for index
- // everything used as an index for array will grow monotonically
- // on 8 3.4 GHz cores executing one push and pop every cycle this could be a
- // problem in about 21 years
- // Proof: (/ 18446744073709551615 (* 3.4 8 (expt 10 9)) 60 60 24 365)
- // MAX G m h d y
- std::vector<bound> local_bounds; // padding? might be overkill
- // atomics generate "lock <x86 instruction i.e. add>" which causes the CPU to lock a cacheline. If both are on one
- alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_back = 0;
- alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_front = 0;
- uint64_t follow_back = 0;
- uint64_t follow_front = 0;
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement