Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef SPINLOCKQUEUE_HPP
- #define SPINLOCKQUEUE_HPP 1
- #include <cstddef>
- #include <algorithm>
- #include <atomic>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <utility>
- #include <cassert>
- #include <vector>
- #include <iostream>
- #ifdef HAS_BOOST
- #include <boost/date_time/posix_time/posix_time.hpp>
- using namespace boost::posix_time;
- #endif
- template <typename T>
- class SpinLockQueue {
- private:
- struct Node {
- Node( T *val ) : value(val), next(0) { }
- Node( Node && rhs)
- {
- *this = std::move(rhs);
- }
- Node( Node const &) = delete;
- Node& operator=(Node const &) = delete;
- Node& operator=(Node && rhs) {
- if (this != &rhs)
- {
- value = rhs.value;
- rhs.value = 0;
- next = rhs.next;
- rhs.next = 0;
- }
- return *this;
- }
- ~Node() {
- delete value;
- }
- T* value;
- Node* next;
- } __attribute__((aligned(64)));
- std::atomic<std::size_t> mWaiters;
- std::mutex mMutex;
- std::condition_variable mCondition;
- Node* first __attribute__((aligned(16)));
- // shared among consumers
- atomic<long> consumerLock;
- char pad[ 64 - sizeof(consumerLock) - sizeof(first)];
- // for one producer at a time
- Node* last __attribute__((aligned(32)));
- // shared among producers
- atomic<long> producerLock __attribute__((aligned(32)));
- atomic<Node*> nodeList __attribute__((aligned(64)));
- Node* getNewNode(T const & t) {
- Node* n = nodeList;
- if (n)
- {
- while (n && !nodeList.compare_exchange_strong(n,n->next))
- {
- n = nodeList;
- }
- if (n)
- {
- n->value = new T(t);
- n->next = 0;
- return n;
- } else {
- return new Node(new T(t));
- }
- } else {
- return new Node(new T(t));
- }
- }
- Node* getNewNode(T* t) {
- Node * n = nodeList;
- if (n)
- {
- Node* next = n->next;
- while (!nodeList.compare_exchange_strong(n,next))
- {
- n = nodeList;
- if (n)
- {
- next = n->next;
- }
- }
- if (n)
- {
- n->value = t;
- n->next = 0;
- return n;
- }
- else
- {
- return new Node(n);
- }
- }
- else
- {
- return new Node(n);
- }
- }
- void addNodeToList(Node* node)
- {
- Node *n(0);
- do {
- n = nodeList;
- node->next = n;
- } while (!nodeList.compare_exchange_strong(n,node));
- }
- public:
- SpinLockQueue() : mWaiters(0) {
- nodeList = new Node(0);
- first = last = new Node(0); // add dummy separator
- producerLock = false;
- consumerLock = false;
- }
- ~SpinLockQueue() {
- Node* tmp;
- while( (tmp = first) != 0 ) { // release the list
- first = tmp->next;
- delete tmp;
- }
- while ( (tmp = nodeList) != 0) {
- nodeList = tmp->next;
- delete tmp;
- }
- }
- void push( const T& t ) {
- Node* newlast = getNewNode(t);
- assert(newlast);
- assert(newlast->value);
- long oldvalue = false;
- while (!producerLock.compare_exchange_strong(oldvalue,true)) {}
- assert(producerLock);
- // own producer lock
- last->next = newlast;
- last = newlast;
- assert(producerLock.exchange(false));
- if (mWaiters)
- {
- mCondition.notify_one();
- }
- }
- bool try_pop( T& result ) {
- // No nodes point to front
- long oldvalue = false;
- while (!consumerLock.compare_exchange_strong(oldvalue,true)) { continue; }
- assert(consumerLock.load());
- Node * theFirst = first;
- Node * next = theFirst->next;
- while (next)
- {
- if (next->value == 0)
- {
- first = next;
- theFirst->value = 0;
- addNodeToList(theFirst);
- theFirst = first;
- next = theFirst->next;
- continue;
- }
- else
- {
- T* val = next->value;
- next->value = 0;
- first = next;
- theFirst->value = 0;
- addNodeToList(theFirst);
- long exchangedVal = consumerLock.exchange(false);
- assert(exchangedVal);
- assert(val);
- result = std::move(*val);
- delete val;
- }
- return true;
- }
- assert(consumerLock.exchange(false));
- return false;
- }
- void pop(T& result) {
- ++mWaiters;
- std::unique_lock<std::mutex> lk(mMutex);
- while (!try_pop(result))
- {
- mCondition.wait(lk);
- }
- --mWaiters;
- }
- };
- #if 1
- static std::atomic<int> pushed, popped;
- void process(SpinLockQueue<std::vector<std::string> >* queue)
- {
- for (int i = 0; i < 100000; ++i)
- {
- queue->push(std::vector<std::string>());
- ++pushed;
- }
- std::vector<std::string> a;
- #pragma omp parallel private(a)
- while (queue->try_pop(a))
- {
- ++popped;
- }
- #pragma omp barrier
- }
- int main(int argc, char * argv[])
- {
- SpinLockQueue<std::vector<std::string> > *queue = new SpinLockQueue<std::vector<std::string> >;
- pushed = 0;
- popped = 0;
- #ifdef HAS_BOOST
- ptime begin(microsec_clock::local_time());
- #endif
- process(queue);
- #ifdef HAS_BOOST
- ptime end(microsec_clock::local_time());
- std::cout << "Numbers: pushes: " << pushed.load() << " pops: " << popped.load() << std::endl;
- std::cout << "Time taken: " << (end-begin) << std::endl;
- #endif
- delete queue;
- return 0;
- }
- #endif
- #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement