Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.08 KB | None | 0 0
  1. #ifndef SPINLOCKQUEUE_HPP
  2. #define SPINLOCKQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <utility>
  10. #include <cassert>
  11. #include <vector>
  12. #include <iostream>
  13. #ifdef HAS_BOOST
  14. #include <boost/date_time/posix_time/posix_time.hpp>
  15. using namespace boost::posix_time;
  16. #endif
  17.  
  18.  
  19. template <typename T>
  20. class SpinLockQueue {
  21. private:
  22.   struct Node {
  23.     Node( T *val ) : value(val), next(0) { }
  24.     Node( Node && rhs)
  25.     {
  26.     *this = std::move(rhs);
  27.     }
  28.     Node( Node const &) = delete;
  29.     Node& operator=(Node const &) = delete;
  30.     Node& operator=(Node && rhs) {
  31.     if (this != &rhs)
  32.     {
  33.         value = rhs.value;
  34.         rhs.value = 0;
  35.         next = rhs.next;
  36.         rhs.next = 0;
  37.     }
  38.  
  39.     return *this;
  40.     }    
  41.  
  42.     ~Node() {
  43.     delete value;
  44.     }
  45.     T* value;
  46.     Node* next;
  47.   } __attribute__((aligned(64)));
  48.  
  49.  
  50.  
  51.   std::atomic<std::size_t> mWaiters;
  52.   std::mutex mMutex;
  53.   std::condition_variable mCondition;
  54.  
  55.  
  56.   Node* first __attribute__((aligned(16)));
  57.  
  58.   // shared among consumers
  59.   atomic<long> consumerLock;
  60.   char pad[ 64 - sizeof(consumerLock) - sizeof(first)];
  61.   // for one producer at a time
  62.   Node* last __attribute__((aligned(32)));
  63.  
  64.   // shared among producers
  65.   atomic<long> producerLock __attribute__((aligned(32)));
  66.  
  67.  
  68.   atomic<Node*> nodeList __attribute__((aligned(64)));
  69.  
  70.  
  71.   Node* getNewNode(T const & t) {
  72.     Node* n = nodeList;
  73.     if (n)
  74.     {
  75.         while (n && !nodeList.compare_exchange_strong(n,n->next))
  76.         {
  77.             n = nodeList;
  78.         }
  79.  
  80.         if (n)
  81.         {
  82.             n->value = new T(t);
  83.             n->next = 0;
  84.             return n;
  85.         } else {
  86.             return new Node(new T(t));
  87.         }
  88.        
  89.     } else {
  90.         return new Node(new T(t));
  91.     }
  92.   }
  93.  
  94.   Node* getNewNode(T* t) {
  95.     Node * n = nodeList;
  96.     if (n)
  97.     {
  98.         Node* next = n->next;
  99.         while (!nodeList.compare_exchange_strong(n,next))
  100.         {
  101.             n = nodeList;
  102.             if (n)
  103.             {
  104.                 next = n->next;
  105.             }
  106.         }
  107.  
  108.         if (n)
  109.         {
  110.             n->value = t;
  111.             n->next = 0;
  112.             return n;
  113.         }
  114.         else
  115.         {
  116.             return new Node(n);
  117.         }
  118.     }
  119.     else
  120.     {
  121.         return new Node(n);
  122.     }
  123.   }
  124.  
  125.   void addNodeToList(Node* node)
  126.   {
  127.         Node *n(0);
  128.                 do {
  129.                         n = nodeList;
  130.                         node->next = n;
  131.                 } while (!nodeList.compare_exchange_strong(n,node));
  132.  
  133.   }
  134.  
  135. public:
  136.   SpinLockQueue() : mWaiters(0) {
  137.     nodeList = new Node(0);
  138.     first  = last = new Node(0);           // add dummy separator
  139.  
  140.     producerLock = false;
  141.     consumerLock = false;
  142.   }
  143.   ~SpinLockQueue() {
  144.     Node* tmp;
  145.     while( (tmp = first) != 0 ) {   // release the list
  146.       first = tmp->next;
  147.       delete tmp;
  148.     }
  149.  
  150.     while ( (tmp = nodeList) != 0) {
  151.     nodeList = tmp->next;
  152.     delete tmp;
  153.     }
  154.   }
  155.  
  156.  
  157.   void push( const T& t ) {
  158.     Node* newlast = getNewNode(t);
  159.     assert(newlast);
  160.     assert(newlast->value);
  161.     long oldvalue = false;
  162.     while (!producerLock.compare_exchange_strong(oldvalue,true)) {}
  163.     assert(producerLock);
  164. // own producer lock
  165.  
  166.     last->next = newlast;
  167.     last = newlast;
  168.  
  169.     assert(producerLock.exchange(false));
  170.  
  171.     if (mWaiters)
  172.     {
  173.     mCondition.notify_one();
  174.     }
  175.   }
  176.  
  177.   bool try_pop( T& result ) {
  178.     // No nodes point to front
  179.     long oldvalue = false;
  180.     while (!consumerLock.compare_exchange_strong(oldvalue,true)) { continue; }
  181.     assert(consumerLock.load());
  182.    
  183.     Node * theFirst = first;
  184.     Node * next = theFirst->next;
  185.     while (next)
  186.     {
  187.     if (next->value == 0)
  188.     {
  189.        
  190.         first = next;
  191.                 theFirst->value = 0;
  192.         addNodeToList(theFirst);
  193.         theFirst = first;
  194.         next = theFirst->next;
  195.         continue;
  196.     }
  197.     else
  198.     {
  199.         T* val = next->value;
  200.         next->value = 0;
  201.         first = next;
  202.         theFirst->value = 0;   
  203.         addNodeToList(theFirst);
  204.         long exchangedVal = consumerLock.exchange(false);
  205.         assert(exchangedVal);
  206.         assert(val);
  207.         result = std::move(*val);
  208.         delete val;
  209.  
  210.     }
  211.  
  212.     return true;
  213.     }
  214.    
  215.     assert(consumerLock.exchange(false));
  216.     return false;
  217.   }
  218.  
  219.  
  220.  void pop(T& result) {
  221.     ++mWaiters;
  222.  
  223.     std::unique_lock<std::mutex> lk(mMutex);
  224.     while (!try_pop(result))
  225.     {
  226.         mCondition.wait(lk);
  227.     }
  228.  
  229.     --mWaiters;
  230.     }
  231.  
  232. };
  233.  
  234. #if 1
  235.  
  236. static std::atomic<int> pushed, popped;
  237.  
  238. void process(SpinLockQueue<std::vector<std::string> >* queue)
  239. {
  240.  
  241.         for (int i = 0; i < 100000; ++i)
  242.         {
  243.                 queue->push(std::vector<std::string>());
  244.         ++pushed;
  245.         }
  246.  
  247.     std::vector<std::string> a;
  248. #pragma omp parallel private(a)
  249.     while (queue->try_pop(a))
  250.     {
  251.         ++popped;
  252.     }
  253. #pragma omp  barrier
  254. }
  255.  
  256.  
  257.  
  258. int main(int argc, char * argv[])
  259. {
  260.     SpinLockQueue<std::vector<std::string> > *queue = new SpinLockQueue<std::vector<std::string> >;
  261.     pushed = 0;
  262.     popped = 0;
  263. #ifdef HAS_BOOST
  264.     ptime begin(microsec_clock::local_time()); 
  265. #endif
  266.     process(queue);
  267. #ifdef HAS_BOOST
  268.     ptime end(microsec_clock::local_time());
  269.  
  270.  
  271.     std::cout << "Numbers: pushes: " << pushed.load() << " pops: " << popped.load() << std::endl;
  272.     std::cout << "Time taken: " << (end-begin) << std::endl;
  273. #endif
  274.     delete queue;
  275.     return 0;
  276. }
  277.  
  278.  
  279. #endif
  280.  
  281. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement