Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Either remove "stdafx.h" on non-Windows platforms, or create an empty file
- // with that name.
- #include "stdafx.h"
- #ifdef WIN32
- #include <windows.h>
- #else
- #include <pthread.h>
- #include <numa.h>
- #endif
- #include <stdint.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
- #include <atomic>
- #include <chrono>
- #include <condition_variable>
- #include <iomanip>
- #include <iostream>
- #include <memory>
- #include <set>
- #include <string>
- #include <thread>
- #include <vector>
- using namespace std;
- // SMT on x86, tell the CPU-thread to relax a bit while busy waiting
- #ifdef WIN32
- #define cpu_relax() __asm { pause }
- #else
- #if defined(__i386__) || defined(__x86_64)
- #define cpu_relax() asm volatile("pause" ::: "memory")
- #else
- #define cpu_relax() asm volatile ("" ::: "memory")
- #endif
- #endif
- typedef uint32_t num_t;
- typedef uint32_t cpu_id;
- typedef enum tse
- {
- THR_READY,
- THR_STEADY,
- THR_GO
- } thr_state_e;
- // Global state to implement a rendevouz for the two communicating threads
- mutex mtx;
- condition_variable cv;
- thr_state_e thr_state;
- static void rendevouz(void)
- {
- unique_lock<mutex> lock(mtx);
- if (thr_state == THR_READY)
- {
- thr_state = THR_STEADY;
- cv.wait(lock, [] { return thr_state == THR_GO; });
- }
- else
- {
- thr_state = THR_GO;
- cv.notify_all();
- }
- }
- [[noreturn]] static void panic(const char * desc)
- {
- perror(desc);
- exit(EXIT_FAILURE);
- }
- // Binds a software thread to a specific CPU-thread
- static void cpu_bind
- (
- cpu_id c
- )
- {
- #ifdef WIN32
- if (SetThreadAffinityMask(GetCurrentThread(), 1UL << c) == 0)
- {
- panic("SetThreadAffinityMask() failed");
- }
- #else
- cpu_set_t cpu;
- CPU_ZERO(&cpu);
- CPU_SET(c, &cpu);
- if (pthread_setaffinity_np(pthread_self(), sizeof cpu, &cpu) != 0)
- {
- panic("pthread_setaffinity_np() failed");
- }
- #endif
- }
- void worker
- (
- atomic<num_t> &ch,
- atomic<bool> &is_done,
- cpu_id cpu_thread,
- uint32_t *latency
- )
- {
- num_t cnt = (latency == nullptr ? 1 : 0);
- cpu_bind(cpu_thread);
- rendevouz();
- auto startClk = chrono::steady_clock::now();
- while (!is_done.load(memory_order_relaxed))
- {
- if (ch.load(memory_order_relaxed) == cnt)
- {
- cnt = ch.fetch_add(1, memory_order_relaxed) + 2;
- }
- else
- {
- cpu_relax();
- }
- }
- if (latency != nullptr)
- {
- auto endClk = chrono::steady_clock::now();
- auto duration = chrono::duration_cast<chrono::nanoseconds>(endClk - startClk);
- *latency = duration.count() / cnt;
- }
- };
- static void measure
- (
- cpu_id cpu_threads,
- const set<cpu_id> &cpu_filter,
- int numa_node = 0
- )
- {
- for (cpu_id ping_id = 0; ping_id < cpu_threads - 1; ping_id++)
- {
- for (cpu_id pong_id = ping_id + 1; pong_id < cpu_threads; pong_id++)
- {
- if (cpu_filter.count(ping_id) + cpu_filter.count(pong_id) != 2)
- {
- continue;
- }
- cout << setw(2) << ping_id << " <-> "
- << setw(2) << left << pong_id << right << flush;
- atomic<num_t> *channel;
- channel = static_cast<atomic<num_t>*>(numa_alloc_onnode(sizeof *channel, numa_node));
- atomic<bool> is_done{ false };
- uint32_t latency;
- thread ping(worker, ref(*channel), ref(is_done), ping_id, &latency);
- thread pong(worker, ref(*channel), ref(is_done), pong_id, nullptr);
- this_thread::sleep_for(chrono::seconds(4));
- is_done.store(true);
- ping.join();
- pong.join();
- cout << " : " << setw(3) << latency << " ns" << endl;
- numa_free(channel, sizeof *channel);
- }
- }
- }
- static set<cpu_id> parse_args
- (
- const vector<string> &args,
- cpu_id cpu_cnt
- )
- {
- set<cpu_id> filter;
- if (args.size() == 0)
- {
- for (int c = 0; c < cpu_cnt; c++)
- {
- filter.insert(c);
- }
- }
- else
- {
- for (auto cpu_thr: args)
- {
- filter.insert(stoi(cpu_thr));
- }
- }
- return filter;
- }
- int main
- (
- int argc,
- char *argv[]
- )
- {
- auto num_numa_nodes = numa_num_task_nodes();
- try
- {
- vector<string> args(argv + 1, argv + argc);
- cpu_id cpu_threads = thread::hardware_concurrency();
- auto cpu_filter = parse_args(args, cpu_threads);
- cout << "This system got " << cpu_threads << " CPU-threads and "
- << num_numa_nodes << " NUMA nodes." << endl;
- for (auto numa_node = 0; numa_node < num_numa_nodes; numa_node++)
- {
- cout << "Allocate from NUMA node " << numa_node << endl;
- measure(cpu_threads, cpu_filter);
- }
- }
- catch (...)
- {
- cerr << "Usage: " << argv[0] << " [CPUTHR0 CPUTHR1 ...]" << endl;
- }
- }
- // Local Variables:
- // compile-command: "make CXXFLAGS=\"-std=c++11 -O2 -pthread\" LDLIBS=\"-lnuma\" th_ping"
- // End:
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement