Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
54
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.01 KB | None | 0 0
  1. #ifndef SPINLOCKQUEUE_HPP
  2. #define SPINLOCKQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <utility>
  10. #include <cassert>
  11. #include <vector>
  12. #include <iostream>
  13. #include <boost/date_time/posix_time/posix_time.hpp>
  14. using namespace boost::posix_time;
  15.  
  16. template <typename T>
  17. class SpinLockQueue {
  18. private:
  19.   struct Node {
  20.     Node( T *val ) : value(val), next(0) { }
  21.     Node( Node && rhs)
  22.     {
  23.     *this = std::move(rhs);
  24.     }
  25.     Node( Node const &) = delete;
  26.     Node& operator=(Node const &) = delete;
  27.     Node& operator=(Node && rhs) {
  28.     if (this != &rhs)
  29.     {
  30.         value = rhs.value;
  31.         rhs.value = 0;
  32.         next = rhs.next;
  33.         rhs.next = 0;
  34.     }
  35.  
  36.     return *this;
  37.     }    
  38.  
  39.     ~Node() {
  40.     delete value;
  41.     }
  42.     T* value;
  43.     Node* next;
  44.   } __attribute__((aligned(64)));
  45.  
  46.  
  47.  
  48.   std::atomic<std::size_t> mWaiters;
  49.   std::mutex mMutex;
  50.   std::condition_variable mCondition;
  51.  
  52.  
  53.   Node* first __attribute__((aligned(16)));
  54.  
  55.   // shared among consumers
  56.   atomic<long> consumerLock;
  57.   char pad[ 64 - sizeof(consumerLock) - sizeof(first)];
  58.   // for one producer at a time
  59.   Node* last __attribute__((aligned(32)));
  60.  
  61.   // shared among producers
  62.   atomic<long> producerLock __attribute__((aligned(32)));
  63.  
  64.  
  65.   atomic<Node*> nodeList __attribute__((aligned(64)));
  66.  
  67.  
  68.   Node* getNewNode(T const & t) {
  69.     Node* n = nodeList;
  70.     if (n)
  71.     {
  72.         while (n && !nodeList.compare_exchange_strong(n,n->next))
  73.         {
  74.             n = nodeList;
  75.         }
  76.  
  77.         if (n)
  78.         {
  79.             n->value = new T(t);
  80.             n->next = 0;
  81.             return n;
  82.         } else {
  83.             return new Node(new T(t));
  84.         }
  85.        
  86.     } else {
  87.         return new Node(new T(t));
  88.     }
  89.   }
  90.  
  91.   Node* getNewNode(T* t) {
  92.     Node * n = nodeList;
  93.     if (n)
  94.     {
  95.         Node* next = n->next;
  96.         while (!nodeList.compare_exchange_strong(n,next))
  97.         {
  98.             n = nodeList;
  99.             if (n)
  100.             {
  101.                 next = n->next;
  102.             }
  103.         }
  104.  
  105.         if (n)
  106.         {
  107.             n->value = t;
  108.             n->next = 0;
  109.             return n;
  110.         }
  111.         else
  112.         {
  113.             return new Node(n);
  114.         }
  115.     }
  116.     else
  117.     {
  118.         return new Node(n);
  119.     }
  120.   }
  121.  
  122.   void addNodeToList(Node* node)
  123.   {
  124.         Node *n(0);
  125.                 do {
  126.                         n = nodeList;
  127.                         node->next = n;
  128.                 } while (!nodeList.compare_exchange_strong(n,node));
  129.  
  130.   }
  131.  
  132. public:
  133.   SpinLockQueue() : mWaiters(0) {
  134.     nodeList = new Node(0);
  135.     first  = last = new Node(0);           // add dummy separator
  136.  
  137.     producerLock = false;
  138.     consumerLock = false;
  139.   }
  140.   ~SpinLockQueue() {
  141.     Node* tmp;
  142.     while( (tmp = first) != 0 ) {   // release the list
  143.       first = tmp->next;
  144.       delete tmp;
  145.     }
  146.  
  147.     while ( (tmp = nodeList) != 0) {
  148.     nodeList = tmp->next;
  149.     delete tmp;
  150.     }
  151.   }
  152.  
  153.  
  154.   void push( const T& t ) {
  155.     Node* newlast = getNewNode(t);
  156.     assert(newlast);
  157.     assert(newlast->value);
  158.     long oldvalue = false;
  159.     while (!producerLock.compare_exchange_strong(oldvalue,true)) {}
  160.     assert(producerLock);
  161. // own producer lock
  162.  
  163.     last->next = newlast;
  164.     last = newlast;
  165.  
  166.     assert(producerLock.exchange(false));
  167.  
  168.     if (mWaiters)
  169.     {
  170.     mCondition.notify_one();
  171.     }
  172.   }
  173.  
  174.   bool try_pop( T& result ) {
  175.     // No nodes point to front
  176.     long oldvalue = false;
  177.     while (!consumerLock.compare_exchange_strong(oldvalue,true)) { continue; }
  178.     assert(consumerLock.load());
  179.    
  180.     Node * theFirst = first;
  181.     Node * next = theFirst->next;
  182.     while (next)
  183.     {
  184.     if (next->value == 0)
  185.     {
  186.        
  187.         first = next;
  188.                 theFirst->value = 0;
  189.         addNodeToList(theFirst);
  190.         theFirst = first;
  191.         next = theFirst->next;
  192.         continue;
  193.     }
  194.     else
  195.     {
  196.         T* val = next->value;
  197.         next->value = 0;
  198.         first = next;
  199.         theFirst->value = 0;   
  200.         addNodeToList(theFirst);
  201.         long exchangedVal = consumerLock.exchange(false);
  202.         assert(exchangedVal);
  203.         assert(val);
  204.         result = std::move(*val);
  205.         delete val;
  206.  
  207.     }
  208.  
  209.     return true;
  210.     }
  211.    
  212.     assert(consumerLock.exchange(false));
  213.     return false;
  214.   }
  215.  
  216.  
  217.  void pop(T& result) {
  218.     ++mWaiters;
  219.  
  220.     std::unique_lock<std::mutex> lk(mMutex);
  221.     while (!try_pop(result))
  222.     {
  223.         mCondition.wait(lk);
  224.     }
  225.  
  226.     --mWaiters;
  227.     }
  228.  
  229. };
  230.  
  231. #if 1
  232.  
  233. static std::atomic<int> pushed, popped;
  234.  
  235. void process(SpinLockQueue<std::vector<std::string> >* queue)
  236. {
  237.  
  238.         for (int i = 0; i < 100000; ++i)
  239.         {
  240.                 queue->push(std::vector<std::string>());
  241.         ++pushed;
  242.         }
  243.  
  244.     std::vector<std::string> a;
  245. #pragma omp parallel private(a)
  246.     while (queue->try_pop(a))
  247.     {
  248.         ++popped;
  249.     }
  250. #pragma omp  barrier
  251. }
  252.  
  253.  
  254.  
  255. int main(int argc, char * argv[])
  256. {
  257.     SpinLockQueue<std::vector<std::string> > *queue = new SpinLockQueue<std::vector<std::string> >;
  258.     pushed = 0;
  259.     popped = 0;
  260.  
  261.     ptime begin(microsec_clock::local_time()); 
  262.     process(queue);
  263.     ptime end(microsec_clock::local_time());
  264.  
  265.     std::cout << "Numbers: pushes: " << pushed.load() << " pops: " << popped.load() << std::endl;
  266.     std::cout << "Time taken: " << (end-begin) << std::endl;
  267.     delete queue;
  268.     return 0;
  269. }
  270.  
  271.  
  272. #endif
  273.  
  274. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement