Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
47
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.94 KB | None | 0 0
  1. #ifndef SPINLOCKQUEUE_HPP
  2. #define SPINLOCKQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <utility>
  10. #include <cassert>
  11. template<typename T>
  12. bool operator==(std::atomic<T*> const & a, std::atomic<T*> const & b)
  13. {
  14.     return a.load() == b.load();
  15. }
  16.  
  17. template <typename T>
  18. bool operator!=(std::atomic<T*> const & a, std::atomic<T*> const & b)
  19. {
  20.     return a.load() != b.load();
  21. }
  22.  
  23. template <typename T>
  24. class SpinLockQueue {
  25. private:
  26.   struct Node {
  27.     Node( T *val ) : value(val), next(0) { }
  28.     Node( Node && rhs)
  29.     {
  30.     *this = std::move(rhs);
  31.     }
  32.     Node( Node const &) = delete;
  33.     Node& operator=(Node const &) = delete;
  34.     Node& operator=(Node && rhs) {
  35.     if (this != &rhs)
  36.     {
  37.         value = rhs.value;
  38.         rhs.value = 0;
  39.         next = rhs.next;
  40.         rhs.next = 0;
  41.     }
  42.  
  43.     return *this;
  44.     }    
  45.  
  46.     ~Node() {
  47.     delete value;
  48.     }
  49.     T* value;
  50.     Node* next;
  51.   } __attribute__((aligned(64)));
  52.  
  53.  
  54.  
  55.   std::atomic<std::size_t> mWaiters;
  56.   std::mutex mMutex;
  57.   std::condition_variable mCondition;
  58.  
  59.  
  60.   Node* first __attribute__((aligned(64)));
  61.  
  62.   // shared among consumers
  63.   atomic<bool> consumerLock __attribute__((aligned(64)));
  64.  
  65.   // for one producer at a time
  66.   Node* last __attribute__((aligned(64)));
  67.  
  68.   // shared among producers
  69.   atomic<bool> producerLock __attribute__((aligned(64)));
  70.  
  71.  
  72.   atomic<Node*> nodeList __attribute__((aligned(64)));
  73.  
  74.  
  75.   Node* getNewNode(T const & t) {
  76.     Node* n = nodeList;
  77.     if (n)
  78.     {
  79.         while (n && !nodeList.compare_exchange_weak(n,n->next))
  80.         {
  81.             n = nodeList;
  82.         }
  83.  
  84.         if (n)
  85.         {
  86.             n->value = new T(t);
  87.             n->next = 0;
  88.             return n;
  89.         } else {
  90.             return new Node(new T(t));
  91.         }
  92.        
  93.     } else {
  94.         return new Node(new T(t));
  95.     }
  96.   }
  97. public:
  98.   SpinLockQueue() : mWaiters(0) {
  99.     first  = last = getNewNode(T());           // add dummy separator
  100.  
  101.     producerLock = false;
  102.     consumerLock = false;
  103.     nodeList = 0;
  104.   }
  105.   ~SpinLockQueue() {
  106.     Node* tmp;
  107.     while( (tmp = first) != 0 ) {   // release the list
  108.       first = tmp->next;
  109.       delete tmp;
  110.     }
  111.  
  112.     while ( (tmp = nodeList) != 0) {
  113.     nodeList = tmp->next;
  114.     delete tmp;
  115.     }
  116.   }
  117.  
  118.  
  119.   void push( const T& t ) {
  120.     Node* newlast = getNewNode(t);
  121.     assert(newlast);
  122.     assert(newlast->value);
  123.     bool oldvalue = false;
  124.     while (!producerLock.compare_exchange_weak(oldvalue,true)) {}
  125. // own producer lock
  126.  
  127.     last->next = newlast;
  128.     last = newlast;
  129.  
  130.     producerLock = false;
  131.  
  132.     if (mWaiters)
  133.     {
  134.     mCondition.notify_one();
  135.     }
  136.   }
  137.  
  138.   bool try_pop( T& result ) {
  139.     // No nodes point to front
  140.     bool oldvalue = false;
  141.     while (!consumerLock.compare_exchange_weak(oldvalue,true)) {}
  142.     Node * theFirst = first;
  143.     Node * next = theFirst->next;
  144.     if (next)
  145.     {
  146.     T* val = next->value;
  147.     next->value = 0;
  148.     first = next;
  149.     consumerLock = false;
  150.     if (val)
  151.     {
  152.         result = std::move(*val);
  153.         delete val;
  154.     }
  155.     theFirst->value = 0;
  156.     Node * n(0);
  157.     do {
  158.         n = nodeList;
  159.         theFirst->next = n;
  160.  
  161.     } while (!nodeList.compare_exchange_weak(n,theFirst));     
  162.  
  163.     return true;
  164.     }
  165.    
  166.     consumerLock = false;
  167.     return false;
  168.   }
  169.  
  170.  
  171.  void pop(T& result) {
  172.     ++mWaiters;
  173.  
  174.     std::unique_lock<std::mutex> lk(mMutex);
  175.     while (!try_pop(result))
  176.     {
  177.         mCondition.wait(lk);
  178.     }
  179.  
  180.     --mWaiters;
  181.     }
  182.  
  183. };
  184.  
  185. #if 1
  186.  
  187. void process(SpinLockQueue<std::string>* queue)
  188. {
  189. #pragma omp parallel for shared(queue)
  190.         for (int i = 0; i < 100000; ++i)
  191.         {
  192.                 queue->push(std::string());
  193.                 if (i % 3 == 0)
  194.                 {
  195.                         std::string a;
  196.                         queue->try_pop(a);
  197.                 }
  198.         }
  199. #pragma omp barrier
  200. }
  201.  
  202.  
  203.  
  204. int main(int argc, char * argv[])
  205. {
  206.     SpinLockQueue<std::string> *queue = new SpinLockQueue<std::string>;
  207.     process(queue);
  208.     delete queue;
  209.     return 0;
  210. }
  211.  
  212.  
  213. #endif
  214.  
  215. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement