Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- export LD_LIBRARY_PATH=/usr/local/cuda/lib64:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/lib/:$LD_LIBRARY_PATH
- export LIBRARY_PATH=/usr/local/cuda/lib64:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/lib/:$LIBRARY_PATH
- export C_INCLUDE_PATH=/usr/local/cuda/include/:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/include/:$C_INCLUDE_PATH
- export CPLUS_INCLUDE_PATH=/usr/local/cuda/include/:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/include/:$CPLUS_INCLUDE_PATH
- g++ send_recv.cc -lpthread -lcudart -lnccl
- */
- #include <unistd.h>
- #include <cassert>
- #include <chrono>
- #include <functional>
- #include <iostream>
- #include <memory>
- #include <mutex>
- #include <string>
- #include <thread>
- #include <vector>
- #include "cuda_runtime.h"
- #include "nccl.h"
- ncclUniqueId ncclId;
- #define CUDACHECK(cmd) \
- do { \
- cudaError_t e = cmd; \
- if (e != cudaSuccess) { \
- printf("Failed: Cuda error %s:%d '%s'\n", __FILE__, __LINE__, \
- cudaGetErrorString(e)); \
- assert(false); \
- } \
- } while (0)
- #define NCCLCHECK(cmd) \
- do { \
- ncclResult_t r = cmd; \
- if (r != ncclSuccess) { \
- printf("Failed, NCCL error %s:%d '%s'\n", __FILE__, __LINE__, \
- ncclGetErrorString(r)); \
- assert(false); \
- } \
- } while (0)
- #define ASYNC 1
- const int device_count = 4;
- void custom_recv(int dev_id, int device_count) {
- cudaSetDevice(dev_id);
- ncclComm_t comm;
- #if NCCL_VERSION >= 21700
- ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
- NCCLCHECK(ncclCommInitRankConfig(&comm, device_count, ncclId, dev_id, &config));
- #else
- NCCLCHECK(ncclCommInitRank(&comm, device_count, ncclId, dev_id));
- #endif
- cudaStream_t stream;
- CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
- void* buffer;
- CUDACHECK(cudaMalloc(&buffer, sizeof(float)));
- ncclGroupStart();
- NCCLCHECK(ncclRecv(buffer, 1, ncclFloat32, 0, comm, stream));
- ncclGroupEnd();
- float host_buf;
- #if ASYNC
- CUDACHECK(cudaMemcpyAsync(&host_buf, buffer, sizeof(float), cudaMemcpyDeviceToHost, stream));
- // void* buffer2;
- // CUDACHECK(cudaMallocAsync(&buffer2, sizeof(float)*1024, stream));
- // CUDACHECK(cudaStreamSynchronize(stream));
- #else
- cudaStream_t copy_stream;
- CUDACHECK(cudaStreamCreateWithFlags(©_stream, cudaStreamNonBlocking));
- CUDACHECK(cudaMemcpyAsync(&host_buf, buffer, sizeof(float), cudaMemcpyDeviceToHost, copy_stream));
- CUDACHECK(cudaStreamSynchronize(stream));
- #endif
- assert(host_buf == 1);
- end:
- std::cout << "device " << dev_id << " recv done" << std::endl;
- }
- int main(int argc, char** argv) {
- {
- NCCLCHECK(ncclGetUniqueId(&ncclId));
- // send thread, 0 -> 1 2 3
- std::thread thr0([]() {
- cudaSetDevice(0);
- ncclComm_t comm;
- #if NCCL_VERSION >= 21700
- ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
- NCCLCHECK(ncclCommInitRankConfig(&comm, device_count, ncclId, 0, &config));
- #else
- NCCLCHECK(ncclCommInitRank(&comm, device_count, ncclId, 0));
- #endif
- cudaStream_t stream;
- CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
- void* buffer[device_count];
- for (int i = 1; i < device_count; i++) {
- CUDACHECK(cudaMalloc(&buffer[i], sizeof(float)));
- }
- float r = 1;
- for (int i = 1; i < device_count; i++) {
- CUDACHECK(cudaMemcpy(buffer[i], &r, sizeof(float), cudaMemcpyHostToDevice));
- }
- NCCLCHECK(ncclGroupStart());
- for (int i = 1; i < device_count; i++) {
- NCCLCHECK(ncclSend(buffer[i], 1, ncclFloat32, i, comm, stream));
- }
- NCCLCHECK(ncclGroupEnd());
- CUDACHECK(cudaStreamSynchronize(stream));
- std::cout << "device 0 send done" << std::endl;
- });
- // recv thread: i <- 0
- std::vector<std::thread> threads;
- for (int i = 1; i < device_count; i++) {
- threads.push_back(std::thread(custom_recv, i, device_count));
- }
- thr0.join();
- for (auto& thr : threads) {
- thr.join();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement