Advertisement
froleyks

concurrent_queue.hpp<4>

Jan 7th, 2019
181
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.91 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include <atomic>
  4. #include <cassert>
  5. #include <cstddef>
  6. #include <iostream>
  7. #include <limits>
  8. #include <memory>
  9. #include <thread>
  10. #include <vector>
  11.  
  12. // we are always interested in the smallest bounds
  13. #define MAX std::numeric_limits<uint64_t>::max()
  14. #define CACHE_LINE_SIZE 64 // this is not the point
  15.  
  16. template <class T> class ConcurrentQueue {
  17. public:
  18.   ConcurrentQueue(std::size_t capacity, size_t num_threads) {
  19.     // ensure that the types are implemented without locks on this cpu
  20.     assert(std::atomic_is_lock_free(&lead_back));
  21.     assert(std::atomic_is_lock_free(&lead_front));
  22.  
  23.     // round capacity to closest power of two
  24.     modMask = capacity;
  25.     modMask |= modMask >> 1;
  26.     modMask |= modMask >> 2;
  27.     modMask |= modMask >> 4;
  28.     modMask |= modMask >> 8;
  29.     modMask |= modMask >> 16;
  30.     modMask |= modMask >> 32;
  31.     this->capacity = modMask + 1;
  32.  
  33.     // array = std::make_unique<T[]>(capacity, T());
  34.     array = std::make_unique<T[]>(this->capacity);
  35.     local_bounds.resize(num_threads);
  36.   }
  37.  
  38.   // fully concurrent
  39.   void push_back(uint id, const T &e) {
  40.     // thread *id* reserved this space
  41.     // only the thread itself ever writes to his local bounds
  42.     local_bounds[id].back = lead_back; // only r-side is atomic (this is subtle)
  43.     asm volatile("mfence" ::: "memory"); // mfence?
  44.     local_bounds[id].back = lead_back++; // atomic
  45.  
  46.     // thread can't insert, follow_fornt doesn't mark the entry as save
  47.     // two reasons: 1) queue is actually full -> not much to do
  48.     //              2) follow_front is not current -> use this thread to update
  49.     while (local_bounds[id].back >= capacity + follow_front) { // wrapped around
  50.       // case 1) yield so consumer can work
  51.       // case 2) and threads with smaller local back that can still do work
  52.       std::this_thread::yield();
  53.  
  54.       // update follow front to smalles local front
  55.       // only reads on local only one write on follow front
  56.       uint64_t min = lead_front;
  57.       for (size_t i = 0; i < local_bounds.size(); ++i) {
  58.         // need copy
  59.         uint64_t local_front = local_bounds[i].front;
  60.         asm volatile("mfence" ::: "memory");
  61.         if (local_front < min) {
  62.           min = local_front;
  63.         }
  64.       }
  65.       follow_front = min;
  66.     }
  67.     array[local_bounds[id].back & modMask] = e;
  68.  
  69.     // free this index
  70.     local_bounds[id].back = MAX;
  71.   }
  72.  
  73.   // fully concurrent
  74.   T pop_front(uint id) {
  75.     local_bounds[id].front = lead_front; // not totaly sure if needed
  76.     local_bounds[id].front = lead_front++;
  77.     while (local_bounds[id].front >= follow_back) {
  78.       std::this_thread::yield();
  79.  
  80.       uint64_t min = lead_back;
  81.       for (size_t i = 0; i < local_bounds.size(); ++i) {
  82.         uint64_t local_back = local_bounds[i].back;
  83.         asm volatile("mfence" ::: "memory");
  84.         if (local_back < min) {
  85.           min = local_back;
  86.         }
  87.       }
  88.       follow_back = min;
  89.     }
  90.     T result = array[local_bounds[id].front & modMask];
  91.  
  92.     local_bounds[id].front = MAX;
  93.     return result;
  94.   }
  95.  
  96. private:
  97.   std::unique_ptr<T[]> array;
  98.   struct bound {
  99.     // initially  the bounds are disabled
  100.     size_t back = MAX, front = MAX;
  101.   };
  102.   uint64_t capacity; // power of 2
  103.   uint64_t modMask;  // used for index
  104.   // everything used as an index for array will grow monotonically
  105.   // on 8 3.4 GHz cores executing one push and pop every cycle this could be a
  106.   // problem in about 21 years
  107.   // Proof: (/ 18446744073709551615 (* 3.4 8 (expt 10 9)) 60 60 24 365)
  108.   //           MAX                                    G   m  h  d  y
  109.   std::vector<bound> local_bounds; // padding? might be overkill
  110.   // atomics generate "lock <x86 instruction i.e. add>" which causes the CPU to lock a cacheline. If both are on one
  111.   alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_back  = 0;
  112.   alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_front = 0;
  113.   uint64_t follow_back                                      = 0;
  114.   uint64_t follow_front                                     = 0;
  115. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement