Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef SPINLOCKQUEUE_HPP
- #define SPINLOCKQUEUE_HPP 1
- #include <cstddef>
- #include <algorithm>
- #include <atomic>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <utility>
- #include <cassert>
- template<typename T>
- bool operator==(std::atomic<T*> const & a, std::atomic<T*> const & b)
- {
- return a.load() == b.load();
- }
- template <typename T>
- bool operator!=(std::atomic<T*> const & a, std::atomic<T*> const & b)
- {
- return a.load() != b.load();
- }
- template <typename T>
- class SpinLockQueue {
- private:
- struct Node {
- Node( T *val ) : value(val), next(0) { }
- Node( Node && rhs)
- {
- *this = std::move(rhs);
- }
- Node( Node const &) = delete;
- Node& operator=(Node const &) = delete;
- Node& operator=(Node && rhs) {
- if (this != &rhs)
- {
- value = rhs.value;
- rhs.value = 0;
- next = rhs.next;
- rhs.next = 0;
- }
- return *this;
- }
- ~Node() {
- delete value;
- }
- T* value;
- Node* next;
- } __attribute__((aligned(64)));
- std::atomic<std::size_t> mWaiters;
- std::mutex mMutex;
- std::condition_variable mCondition;
- Node* first __attribute__((aligned(64)));
- // shared among consumers
- atomic<bool> consumerLock __attribute__((aligned(64)));
- // for one producer at a time
- Node* last __attribute__((aligned(64)));
- // shared among producers
- atomic<bool> producerLock __attribute__((aligned(64)));
- atomic<Node*> nodeList __attribute__((aligned(64)));
- Node* getNewNode(T const & t) {
- Node* n = nodeList;
- if (n)
- {
- while (n && !nodeList.compare_exchange_weak(n,n->next))
- {
- n = nodeList;
- }
- if (n)
- {
- n->value = new T(t);
- n->next = 0;
- return n;
- } else {
- return new Node(new T(t));
- }
- } else {
- return new Node(new T(t));
- }
- }
- public:
- SpinLockQueue() : mWaiters(0) {
- first = last = getNewNode(T()); // add dummy separator
- producerLock = false;
- consumerLock = false;
- nodeList = 0;
- }
- ~SpinLockQueue() {
- Node* tmp;
- while( (tmp = first) != 0 ) { // release the list
- first = tmp->next;
- delete tmp;
- }
- while ( (tmp = nodeList) != 0) {
- nodeList = tmp->next;
- delete tmp;
- }
- }
- void push( const T& t ) {
- Node* newlast = getNewNode(t);
- assert(newlast);
- assert(newlast->value);
- bool oldvalue = false;
- while (!producerLock.compare_exchange_weak(oldvalue,true)) {}
- // own producer lock
- last->next = newlast;
- last = newlast;
- producerLock = false;
- if (mWaiters)
- {
- mCondition.notify_one();
- }
- }
- bool try_pop( T& result ) {
- // No nodes point to front
- bool oldvalue = false;
- while (!consumerLock.compare_exchange_weak(oldvalue,true)) {}
- Node * theFirst = first;
- Node * next = theFirst->next;
- if (next)
- {
- T* val = next->value;
- next->value = 0;
- first = next;
- consumerLock = false;
- if (val)
- {
- result = std::move(*val);
- delete val;
- }
- theFirst->value = 0;
- Node * n(0);
- do {
- n = nodeList;
- theFirst->next = n;
- } while (!nodeList.compare_exchange_weak(n,theFirst));
- return true;
- }
- consumerLock = false;
- return false;
- }
- void pop(T& result) {
- ++mWaiters;
- std::unique_lock<std::mutex> lk(mMutex);
- while (!try_pop(result))
- {
- mCondition.wait(lk);
- }
- --mWaiters;
- }
- };
- #if 1
- void process(SpinLockQueue<std::string>* queue)
- {
- #pragma omp parallel for shared(queue)
- for (int i = 0; i < 100000; ++i)
- {
- queue->push(std::string());
- if (i % 3 == 0)
- {
- std::string a;
- queue->try_pop(a);
- }
- }
- #pragma omp barrier
- }
- int main(int argc, char * argv[])
- {
- SpinLockQueue<std::string> *queue = new SpinLockQueue<std::string>;
- process(queue);
- delete queue;
- return 0;
- }
- #endif
- #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement