// Simple XCHG based Atomic Stack // By: Chris M. Thomasson #include #include #include #include #include #include // sorry about the macros... #define THREADS 42 #define ITERS 100000 #define CT_MB_RLX std::memory_order_relaxed #define CT_MB_ACQ std::memory_order_acquire #define CT_MB_REL std::memory_order_release // HACK! Humm... #define CT_WAIT ((ct_work*)(0xDEADBEEFU)) // Just to track all the dynamic allocations... static std::atomic g_allocs(0); static std::mutex g_cout_mtx; // A work node struct ct_work { std::atomic m_next; std::thread::id m_data; ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {} void process() { // Simulate just a tiny little work? g_cout_mtx.lock(); std::this_thread::yield(); std::this_thread::yield(); std::this_thread::yield(); std::thread::id local = std::this_thread::get_id(); if (m_data == local) { // std::cout << "processing local = " << m_data << // " from " << std::this_thread::get_id() << "\n"; } else { std::cout << "processing foreign = " << m_data << " from " << std::this_thread::get_id() << "\n"; } std::this_thread::yield(); std::this_thread::yield(); std::this_thread::yield(); g_cout_mtx.unlock(); } ct_work* get_next() const { ct_work* w = nullptr; while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT) { // we can spin, or even do other work right here... std::this_thread::yield(); } return w; } }; // Easy Stack, only uses XCHG struct ct_estack { std::atomic m_head; ct_estack() : m_head(nullptr) {} void push(ct_work* n) { n->m_next.store(CT_WAIT, CT_MB_RLX); ct_work* head = m_head.exchange(n, CT_MB_REL); // release n->m_next.store(head, CT_MB_RLX); } ct_work* flush_try() { return m_head.exchange(nullptr, CT_MB_ACQ); // acquire } }; // Consume an Easy Stack... void ct_consume( ct_estack& estack ) { ct_work* w = estack.flush_try(); while (w) { // Process FIRST! w->process(); // Now, we can gain the next pointer. ct_work* next = w->get_next(); // Okay, we can delete the work delete w; g_allocs.fetch_sub(1, CT_MB_RLX); // dec w = next; } } // Our shared state struct ct_shared { ct_estack m_estack; }; // Produce some work... void ct_produce( ct_estack& estack ) { ct_work* w = new ct_work(std::this_thread::get_id()); g_allocs.fetch_add(1, CT_MB_RLX); // inc estack.push(w); } // Do some work... void ct_worker(ct_shared& shared) { for (unsigned int i = 0; i < ITERS; ++i) { ct_produce(shared.m_estack); ct_produce(shared.m_estack); ct_produce(shared.m_estack); std::this_thread::yield(); // little spice... ct_consume(shared.m_estack); } ct_consume(shared.m_estack); } int main(void) { { ct_shared shared; std::thread threads[THREADS]; for (unsigned long i = 0; i < THREADS; ++i) { threads[i] = std::thread(ct_worker, std::ref(shared)); } for (unsigned long i = 0; i < THREADS; ++i) { threads[i].join(); } } if (g_allocs.load(CT_MB_RLX) != 0) { std::cout << "\n\nLEAKED!!!!\n"; } std::cout << "\n\nFIN!\n"; return 0; }