Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef LOCKFREEQUEUE_HPP
- #define LOCKFREEQUEUE_HPP 1
- #include <cstddef>
- #include <algorithm>
- #include <tbb/atomic.h>
- //#include <thread>
- //#include <mutex>
- //#include <condition_variable>
- //#include <utility>
- #include <boost/thread.hpp>
- #include <iostream>
- template<typename T>
- bool operator==(tbb::atomic<T*> const & a, tbb::atomic<T*> const & b)
- {
- return a.load() == b.load();
- }
- template <typename T>
- bool operator!=(tbb::atomic<T*> const & a, tbb::atomic<T*> const & b)
- {
- return a.load() != b.load();
- }
- template <typename T>
- class LockFreeQueue {
- private:
- struct Node {
- Node( T *val ) : value(val), next(0) { }
- Node( T const &val ) : value(new T(val)), next(0) { }
- ~Node() {
- if(value)
- delete value;
- }
- T* value;
- Node* next;
- };
- tbb::atomic<Node*> first; // for producer only
- tbb::atomic<Node*> last; // shared
- tbb::atomic<std::size_t> mWaiters;
- boost::mutex mMutex;
- boost::condition_variable mCondition;
- public:
- LockFreeQueue() {
- mWaiters = 0;
- first = last = new Node( T() ); // add dummy separator
- }
- ~LockFreeQueue() {
- while((Node*)first != 0 ) { // release the list
- Node* tmp = first;
- first = tmp->next;
- delete tmp;
- }
- }
- void push( const T& t ) {
- Node* newlast = new Node(new T(t));
- while (true)
- {
- Node *last = (Node*)this->last;
- if(this->last.compare_and_swap(newlast, last) == last) {
- last->next = newlast;
- break;
- }
- }
- if (mWaiters)
- {
- mCondition.notify_one();
- }
- }
- bool try_pop( T& result ) {
- // No nodes point to front
- while((Node*)this->first != (Node*)last)
- {
- Node* first = (Node*)this->first;
- if(this->first.compare_and_swap((Node*)(NULL), first) != first) {
- continue;
- }
- if(first == NULL) {
- std::cout << "Popping failed?" << std::endl;
- continue;
- }
- Node *next;
- while((next = first->next) == NULL) { }
- T *val = next->value;
- next->value = NULL;
- if(this->first.compare_and_swap(next, (Node*)(NULL)) != NULL) {
- assert(false); // IMPOSSIBLE!!!
- }
- result = *val; //std::move(*val);
- delete val;
- delete first;
- return true;
- }
- return false;
- }
- void pop(T& result) {
- ++mWaiters;
- boost::unique_lock<boost::mutex> lk(mMutex);
- while (!try_pop(result))
- {
- mCondition.wait(lk);
- }
- --mWaiters;
- }
- };
- #if 1
- #include <stdio.h>
- bool stop;
- void pusher(LockFreeQueue<int> &q) {
- for(int i = 0; !stop && i < 100; i ++)
- q.push(i);
- }
- void popper(LockFreeQueue<int> &q) {
- while(!stop) {
- int i;
- if(q.try_pop(i)) {
- printf("%d ", i);
- fflush(stdout);
- }
- }
- }
- int main(int argc, char * argv[])
- {
- LockFreeQueue<int> queue;
- stop = false;
- std::vector< boost::shared_ptr<boost::thread> > threads;
- std::vector< boost::shared_ptr<boost::thread> >::iterator thread;
- //#pragma omp parallel for shared(queue)
- for (int i = 0; i < 3; ++i)
- {
- threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&popper, boost::ref(queue)))));
- }
- for (int i = 0; i < 3; ++i)
- {
- threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&pusher, boost::ref(queue)))));
- }
- sleep(5);
- stop = true;
- for(thread = threads.begin(); thread != threads.end(); thread ++) {
- (*thread)->join();
- }
- threads.resize(0);
- return 0;
- }
- #endif
- #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement