Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.85 KB | None | 0 0
  1. #ifndef SPINLOCKQUEUE_HPP
  2. #define SPINLOCKQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <utility>
  10. #include <cassert>
  11. template<typename T>
  12. bool operator==(std::atomic<T*> const & a, std::atomic<T*> const & b)
  13. {
  14.     return a.load() == b.load();
  15. }
  16.  
  17. template <typename T>
  18. bool operator!=(std::atomic<T*> const & a, std::atomic<T*> const & b)
  19. {
  20.     return a.load() != b.load();
  21. }
  22.  
  23. template <typename T>
  24. class SpinLockQueue {
  25. private:
  26.   struct Node {
  27.     Node( T *val ) : value(val), next(0) { }
  28.     Node( Node && rhs)
  29.     {
  30.     *this = std::move(rhs);
  31.     }
  32.     Node( Node const &) = delete;
  33.     Node& operator=(Node const &) = delete;
  34.     Node& operator=(Node && rhs) {
  35.     if (this != &rhs)
  36.     {
  37.         value = rhs.value;
  38.         rhs.value = 0;
  39.         next = rhs.next;
  40.         rhs.next = 0;
  41.     }
  42.  
  43.     return *this;
  44.     }    
  45.  
  46.     ~Node() {
  47.     delete value;
  48.     }
  49.     T* value;
  50.     Node* next;
  51.   } __attribute__((aligned(64)));
  52.  
  53.  
  54.  
  55.   std::atomic<std::size_t> mWaiters;
  56.   std::mutex mMutex;
  57.   std::condition_variable mCondition;
  58.  
  59.  
  60.   Node* first __attribute__((aligned(64)));
  61.  
  62.   // shared among consumers
  63.   atomic<bool> consumerLock __attribute__((aligned(64)));
  64.  
  65.   // for one producer at a time
  66.   Node* last __attribute__((aligned(64)));
  67.  
  68.   // shared among producers
  69.   atomic<bool> producerLock __attribute__((aligned(64)));
  70.  
  71.  
  72.   atomic<Node*> nodeList __attribute__((aligned(64)));
  73.  
  74.  
  75.   Node* getNewNode(T const & t) {
  76.     Node* n = nodeList;
  77.     if (n)
  78.     {
  79.         while (n && !nodeList.compare_exchange_weak(n,n->next))
  80.         {
  81.             n = nodeList;
  82.         }
  83.  
  84.         if (n)
  85.         {
  86.             n->value = new T(t);
  87.             n->next = 0;
  88.             return n;
  89.         } else {
  90.             return new Node(new T(t));
  91.         }
  92.        
  93.     } else {
  94.         return new Node(new T(t));
  95.     }
  96.   }
  97. public:
  98.   SpinLockQueue() : mWaiters(0) {
  99.     first  = last =
  100.       new Node( T() );           // add dummy separator
  101.  
  102.     producerLock = false;
  103.     consumerLock = false;
  104.     nodeList = 0;
  105.   }
  106.   ~SpinLockQueue() {
  107.     Node* tmp;
  108.     while( (tmp = first) != 0 ) {   // release the list
  109.       first = tmp->next;
  110.       delete tmp;
  111.     }
  112.  
  113.     while ( (tmp = nodeList) != 0) {
  114.     nodeList = tmp->next;
  115.     delete tmp;
  116.     }
  117.   }
  118.  
  119.  
  120.   void push( const T& t ) {
  121.     Node* newlast = getNewNode(t);
  122.     assert(newlast);
  123.     bool oldvalue = false;
  124.     while (!producerLock.compare_exchange_weak(oldvalue,true)) {}
  125. // own producer lock
  126.  
  127.     last->next = newlast;
  128.     last = newlast;
  129.  
  130.     producerLock = false;
  131.  
  132.     if (mWaiters)
  133.     {
  134.     mCondition.notify_one();
  135.     }
  136.   }
  137.  
  138.   bool try_pop( T& result ) {
  139.     // No nodes point to front
  140.     bool oldvalue = false;
  141.     while (!consumerLock.compare_exchange_weak(oldvalue,true)) {}
  142.     Node * next = first->next;
  143.     Node * theFirst = first;
  144.     if (next)
  145.     {
  146.     T* val = next->value;
  147.     next->value = 0;
  148.     first = next;
  149.     consumerLock = false;
  150.     result = std::move(*val);
  151.     delete val;
  152.     theFirst->value = 0;
  153.     Node * n(0);
  154.     do {
  155.         n = nodeList;
  156.         theFirst->next = n;
  157.  
  158.     } while (!nodeList.compare_exchange_weak(n,theFirst));     
  159.  
  160.     return true;
  161.     }
  162.    
  163.     consumerLock = false;
  164.     return false;
  165.   }
  166.  
  167.  
  168.  void pop(T& result) {
  169.     ++mWaiters;
  170.  
  171.     std::unique_lock<std::mutex> lk(mMutex);
  172.     while (!try_pop(result))
  173.     {
  174.         mCondition.wait(lk);
  175.     }
  176.  
  177.     --mWaiters;
  178.     }
  179.  
  180. };
  181.  
  182. #if 1
  183.  
  184. void process(SpinLockQueue<int>* queue)
  185. {
  186. #pragma omp parallel for shared(queue)
  187.         for (int i = 0; i < 100000; ++i)
  188.         {
  189.                 queue->push(i);
  190.                 if (i % 3 == 0)
  191.                 {
  192.                         int a;
  193.                         queue->try_pop(a);
  194.                 }
  195.         }
  196. #pragma omp barrier
  197. }
  198.  
  199.  
  200.  
  201. int main(int argc, char * argv[])
  202. {
  203.     SpinLockQueue<int> *queue = new SpinLockQueue<int>;
  204.     process(queue);
  205.     delete queue;
  206.     return 0;
  207. }
  208.  
  209.  
  210. #endif
  211.  
  212. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement