Advertisement
Guest User

Untitled

a guest
Jun 25th, 2017
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.58 KB | None | 0 0
  1. #ifndef LOCKFREEQUEUE_HPP
  2. #define LOCKFREEQUEUE_HPP 1
  3. #include <cstddef>
  4. #include <algorithm>
  5. #include <tbb/atomic.h>
  6. //#include <thread>
  7. //#include <mutex>
  8. //#include <condition_variable>
  9. //#include <utility>
  10. #include <boost/thread.hpp>
  11. #include <iostream>
  12.  
  13. template<typename T>
  14. bool operator==(tbb::atomic<T*> const & a, tbb::atomic<T*> const & b)
  15. {
  16.   return a.load() == b.load();
  17. }
  18.  
  19. template <typename T>
  20. bool operator!=(tbb::atomic<T*> const & a, tbb::atomic<T*> const & b)
  21. {
  22.   return a.load() != b.load();
  23. }
  24.  
  25. template <typename T>
  26. class LockFreeQueue {
  27. private:
  28.   struct Node {
  29.     Node( T *val ) : value(val), next(0) { }
  30.     Node( T const &val ) : value(new T(val)), next(0) { }
  31.     ~Node() {
  32.       if(value)
  33.         delete value;
  34.     }
  35.  
  36.     T* value;
  37.     Node* next;
  38.   };
  39.  
  40.   tbb::atomic<Node*> first;        // for producer only
  41.   tbb::atomic<Node*> last;         // shared
  42.   tbb::atomic<std::size_t> mWaiters;
  43.   boost::mutex mMutex;
  44.   boost::condition_variable mCondition;
  45.  
  46. public:
  47.   LockFreeQueue() {
  48.     mWaiters = 0;
  49.     first = last = new Node( T() );           // add dummy separator
  50.   }
  51.   ~LockFreeQueue() {
  52.     while((Node*)first != 0 ) {   // release the list
  53.       Node* tmp = first;
  54.       first = tmp->next;
  55.       delete tmp;
  56.     }
  57.   }
  58.  
  59.  
  60.   void push( const T& t ) {
  61.     Node* newlast = new Node(new T(t));
  62.     while (true)
  63.     {
  64.       Node *last = (Node*)this->last;
  65.       if(this->last.compare_and_swap(newlast, last) == last) {
  66.         last->next = newlast;
  67.         break;
  68.       }
  69.     }
  70.  
  71.     if (mWaiters)
  72.     {
  73.       mCondition.notify_one();
  74.     }
  75.   }
  76.  
  77.   bool try_pop( T& result ) {
  78.     // No nodes point to front
  79.  
  80.     while((Node*)this->first != (Node*)last)
  81.     {
  82.       Node* first = (Node*)this->first;
  83.       if(this->first.compare_and_swap((Node*)(NULL), first) != first) {
  84.         continue;
  85.       }
  86.  
  87.       if(first == NULL) {
  88.         std::cout << "Popping failed?" << std::endl;
  89.         continue;
  90.       }
  91.  
  92.       Node *next;
  93.       while((next = first->next) == NULL) { }
  94.  
  95.       T *val = next->value;
  96.       next->value = NULL;
  97.  
  98.       if(this->first.compare_and_swap(next, (Node*)(NULL)) != NULL) {
  99.         assert(false); // IMPOSSIBLE!!!
  100.       }
  101.  
  102.       result = *val; //std::move(*val);
  103.       delete val;
  104.       delete first;
  105.       return true;
  106.     }
  107.     return false;
  108.   }
  109.  
  110.   void pop(T& result) {
  111.     ++mWaiters;
  112.  
  113.     boost::unique_lock<boost::mutex> lk(mMutex);
  114.     while (!try_pop(result))
  115.     {
  116.       mCondition.wait(lk);
  117.     }
  118.  
  119.     --mWaiters;
  120.   }
  121.  
  122. };
  123.  
  124. #if 1
  125.  
  126. #include <stdio.h>
  127.  
  128. bool stop;
  129.  
  130. void pusher(LockFreeQueue<int> &q) {
  131.   for(int i = 0; !stop && i < 100; i ++)
  132.     q.push(i);
  133. }
  134.  
  135. void popper(LockFreeQueue<int> &q) {
  136.   while(!stop) {
  137.     int i;
  138.     if(q.try_pop(i)) {
  139.       printf("%d ", i);
  140.       fflush(stdout);
  141.     }
  142.   }
  143. }
  144.  
  145. int main(int argc, char * argv[])
  146. {
  147.   LockFreeQueue<int> queue;
  148.   stop = false;
  149.   std::vector< boost::shared_ptr<boost::thread> > threads;
  150.   std::vector< boost::shared_ptr<boost::thread> >::iterator thread;
  151. //#pragma omp parallel for shared(queue)
  152.   for (int i = 0; i < 3; ++i)
  153.   {
  154.     threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&popper, boost::ref(queue)))));
  155.   }
  156.   for (int i = 0; i < 3; ++i)
  157.   {
  158.     threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&pusher, boost::ref(queue)))));
  159.   }
  160.   sleep(5);
  161.   stop = true;
  162.   for(thread = threads.begin(); thread != threads.end(); thread ++) {
  163.     (*thread)->join();
  164.   }
  165.   threads.resize(0);
  166.  
  167.   return 0;
  168. }
  169.  
  170. #endif
  171.  
  172. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement