Advertisement
froleyks

concurrent_queue.hpp

Jan 14th, 2019
182
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.15 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include <atomic>
  4. #include <cassert>
  5. #include <cstddef>
  6. #include <iostream>
  7. #include <limits>
  8. #include <memory>
  9. #include <thread>
  10. #include <vector>
  11.  
  12. // we are always interested in the smallest bounds
  13. #define MAX std::numeric_limits<uint64_t>::max()
  14. #define CACHE_LINE_SIZE 64 // this is not the point
  15.  
  16. template <class T> class ConcurrentQueue {
  17. public:
  18.   ConcurrentQueue(std::size_t capacity, size_t num_threads,
  19.                   std::size_t elements_to_process = MAX)
  20.       : modMask(capacity), local_bounds(num_threads),
  21.         elements_to_process(elements_to_process) {
  22.     // ensure that the types are implemented without locks on this cpu
  23.     assert(std::atomic_is_lock_free(&lead_back));
  24.     assert(std::atomic_is_lock_free(&lead_front));
  25.  
  26.     // round capacity to closest power of two
  27.     modMask |= modMask >> 1;
  28.     modMask |= modMask >> 2;
  29.     modMask |= modMask >> 4;
  30.     modMask |= modMask >> 8;
  31.     modMask |= modMask >> 16;
  32.     modMask |= modMask >> 32;
  33.     this->capacity = modMask + 1;
  34.  
  35.     array = std::make_unique<T[]>(this->capacity);
  36.     // local_bounds.resize(num_threads);
  37.   }
  38.  
  39.   // fully concurrent
  40.   void push_back(uint id, const T &e) {
  41.     // thread *id* reserved this space
  42.     // only the thread itself ever writes to his local bounds
  43.     local_bounds[id].back = lead_back; // only r-side is atomic (this is subtle)
  44.     // asm volatile("mfence" ::: "memory"); // mfence? the involvement of atomic
  45.     // should construct a mfence anyway
  46.     local_bounds[id].back = lead_back++; // atomic
  47.  
  48.     // thread can't insert, follow_fornt doesn't mark the entry as save
  49.     // two reasons: 1) queue is actually full -> not much to do
  50.     //              2) follow_front is not current -> use this thread to update
  51.     while (local_bounds[id].back >= capacity + follow_front) { // wrapped around
  52.       // case 1) yield so consumer can work
  53.       // case 2) and threads with smaller local back that can still do work
  54.       std::this_thread::yield();
  55.  
  56.       // update follow front to smalles local front
  57.       // only reads on local only one write on follow front
  58.       uint64_t min = lead_front;
  59.       for (size_t i = 0; i < local_bounds.size(); ++i) {
  60.         // need copy
  61.         uint64_t local_front = local_bounds[i].front;
  62.         // asm volatile("mfence" ::: "memory"); // could increase performance
  63.         // but it is unlikely and correctness is guaranteed anyway
  64.         if (local_front < min) {
  65.           min = local_front;
  66.         }
  67.       }
  68.       follow_front = min;
  69.     }
  70.     array[local_bounds[id].back & modMask] = e;
  71.  
  72.     // free this index
  73.     local_bounds[id].back = MAX;
  74.   }
  75.  
  76.   // fully concurrent
  77.   T pop_front(uint id) {
  78.     local_bounds[id].front = lead_front;
  79.     local_bounds[id].front = lead_front++;
  80.     while (local_bounds[id].front >= follow_back) {
  81.       if (local_bounds[id].front >= elements_to_process) {
  82.         local_bounds[id].front = MAX;
  83.         return EOW;
  84.       }
  85.       std::this_thread::yield();
  86.       uint64_t min = lead_back;
  87.       for (size_t i = 0; i < local_bounds.size(); ++i) {
  88.         uint64_t local_back = local_bounds[i].back;
  89.         // asm volatile("mfence" ::: "memory");
  90.         if (local_back < min) {
  91.           min = local_back;
  92.         }
  93.       }
  94.       follow_back = min;
  95.     }
  96.     T result = array[local_bounds[id].front & modMask];
  97.  
  98.     local_bounds[id].front = MAX;
  99.     return result;
  100.   }
  101.  
  102.   size_t size_upper_bound() {
  103.     uint64_t l_lead_back    = lead_back;
  104.     uint64_t l_follow_front = follow_front;
  105.     return l_lead_back - l_follow_front;
  106.   }
  107.  
  108.   size_t size_lower_bound() {
  109.     uint64_t l_follow_back = follow_back;
  110.     uint64_t l_lead_front  = lead_front;
  111.     return l_follow_back - l_lead_front;
  112.   }
  113.  
  114.   size_t get_work_done() { return lead_front; }
  115.  
  116. private:
  117.   // T does not need to be moveable
  118.   std::unique_ptr<T[]> array;
  119.  
  120.   // T() could be more expensive to build than to copy
  121.   const T EOW = T();
  122.  
  123.   // padding? might be overkill
  124.   // only one thread writes all other only (rarely) read, so with padding those
  125.   // cachelines are never invalidated, but it could be costly for a high amount
  126.   // of threads since iterating over it will be inefficient -> Test
  127.   struct alignas(CACHE_LINE_SIZE) bound {
  128.     // initially  the bounds are disabled
  129.     size_t back = MAX, front = MAX;
  130.   };
  131.   uint64_t capacity; // power of 2
  132.   uint64_t modMask;  // used for index
  133.   // everything used as an index for array will grow monotonically
  134.   // on 8 3.4 GHz cores executing one push and pop every cycle this could be a
  135.   // problem in about 21 years
  136.   // Proof: (/ 18446744073709551615 (* 3.4 8 (expt 10 9)) 60 60 24 365)
  137.   //           MAX                                    G   m  h  d  y
  138.  
  139.   std::vector<bound> local_bounds;
  140.   // atomics generate "lock <x86 instruction i.e. add>" which causes the CPU to
  141.   // lock a cacheline. If both leads are on one cacheline producers and
  142.   // consumers would interfere unnecessarily, the follows have less potential
  143.   // for speed reduction
  144.   alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_back  = 0;
  145.   alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> lead_front = 0;
  146.   alignas(CACHE_LINE_SIZE) uint64_t follow_back             = 0;
  147.   alignas(CACHE_LINE_SIZE) uint64_t follow_front            = 0;
  148.  
  149.   const uint64_t elements_to_process;
  150. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement