Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
50
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.68 KB | None | 0 0
  1. #ifndef LOCKFREEQUEUE_HPP
  2. #define LOCKFREEQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <utility>
  10.  
  11. template<typename T>
  12. bool operator==(std::atomic<T*> const & a, std::atomic<T*> const & b)
  13. {
  14.     return a.load() == b.load();
  15. }
  16.  
  17. template <typename T>
  18. bool operator!=(std::atomic<T*> const & a, std::atomic<T*> const & b)
  19. {
  20.     return a.load() != b.load();
  21. }
  22.  
  23. template <typename T>
  24. class SpinLockQueue {
  25. private:
  26.   struct Node {
  27.     Node( T *val ) : value(val), next(0) { }
  28.     Node( Node && rhs)
  29.     {
  30.     *this = std::move(rhs);
  31.     }
  32.     Node( Node const &) = delete;
  33.     Node& operator=(Node const &) = delete;
  34.     Node& operator=(Node && rhs) {
  35.     if (this != &rhs)
  36.     {
  37.         value = rhs.value;
  38.         rhs.value = 0;
  39.         next = rhs.next;
  40.         rhs.next = 0;
  41.     }
  42.  
  43.     return *this;
  44.     }    
  45.  
  46.     ~Node() {
  47.     delete value;
  48.     }
  49.     T* value;
  50.     Node* next;
  51.   } __attribute__((aligned(64)));
  52.  
  53.  
  54.  
  55.   std::atomic<std::size_t> mWaiters;
  56.   std::mutex mMutex;
  57.   std::condition_variable mCondition;
  58.  
  59.  
  60.   Node* first __attribute__((aligned(64)));
  61.  
  62.   // shared among consumers
  63.   atomic<bool> consumerLock __attribute__((aligned(64)));
  64.  
  65.   // for one producer at a time
  66.   Node* last __attribute__((aligned(64)));
  67.  
  68.   // shared among producers
  69.   atomic<bool> producerLock __attribute__((aligned(64)));
  70.  
  71.  
  72.   atomic<Node*> nodeList __attribute__((aligned(64)));
  73.  
  74.  
  75.   Node* getNewNode(T const & t) {
  76.     Node* n = nodeList;
  77.     if (n)
  78.     {
  79.         while (n && !nodeList.compare_exchange_weak(n,n->next))
  80.         {
  81.             n = nodeList;
  82.         }
  83.  
  84.         if (n)
  85.         {
  86.             n->value = new T(t);
  87.             n->next = 0;
  88.             return n;
  89.         }
  90.        
  91.     } else {
  92.         return new Node(new T(t));
  93.     }
  94.   }
  95. public:
  96.   SpinLockQueue() : mWaiters(0) {
  97.     first  = last =
  98.       new Node( T() );           // add dummy separator
  99.  
  100.     producerLock = false;
  101.     consumerLock = false;
  102.     nodeList = 0;
  103.   }
  104.   ~SpinLockQueue() {
  105.     Node* tmp;
  106.     while( (tmp = first) != 0 ) {   // release the list
  107.       first = tmp->next;
  108.       delete tmp;
  109.     }
  110.   }
  111.  
  112.  
  113.   void push( const T& t ) {
  114.     Node* newlast = getNewNode(t);
  115.     bool oldvalue = false;
  116.     while (!producerLock.compare_exchange_weak(false,true)) {}
  117. // own producer lock
  118.  
  119.     last->next = newlast;
  120.     last = newlast;
  121.  
  122.     producerLock = false;
  123.  
  124.     if (mWaiters)
  125.     {
  126.     mCondition.notify_one();
  127.     }
  128.   }
  129.  
  130.   bool try_pop( T& result ) {
  131.     // No nodes point to front
  132.     bool oldvalue = false;
  133.     while (!consumerLock.compare_exchange_weak(false,true)) {}
  134.     Node * next = first->next;
  135.     Node * theFirst = first;
  136.     if (next)
  137.     {
  138.     T* val = next->value;
  139.     next->value = 0;
  140.     first = next;
  141.     consumerLock = false;
  142.     result = std::move(*val);
  143.     delete val;
  144.     theFirst->value = 0;
  145.     Node * n(0);
  146.     do {
  147.         n = nodeList;
  148.         theFirst->next = n;
  149.  
  150.     } while (!nodeList.compare_exchange_weak(n,theFirst));     
  151.  
  152.     return true;
  153.     }
  154.    
  155.     consumerLock = false;
  156.     return false;
  157.   }
  158.  
  159.  
  160.  void pop(T& result) {
  161.     ++mWaiters;
  162.  
  163.     std::unique_lock<std::mutex> lk(mMutex);
  164.     while (!try_pop(result))
  165.     {
  166.         mCondition.wait(lk);
  167.     }
  168.  
  169.     --mWaiters;
  170.     }
  171.  
  172. };
  173.  
  174. #if 1
  175.  
  176. void process(SpinLockQueue<int>* queue)
  177. {
  178. #pragma omp parallel for shared(queue)
  179.         for (int i = 0; i < 100000; ++i)
  180.         {
  181.                 queue->push(i);
  182.                 if (i % 3 == 0)
  183.                 {
  184.                         int a;
  185.                         queue->try_pop(a);
  186.                 }
  187.         }
  188. #pragma omp barrier
  189. }
  190.  
  191.  
  192.  
  193. int main(int argc, char * argv[])
  194. {
  195.     SpinLockQueue<int> *queue = new SpinLockQueue<int>;
  196.     process(queue);
  197.     delete queue;
  198.     return 0;
  199. }
  200.  
  201.  
  202. #endif
  203.  
  204. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement