diff --git a/BUILD.bazel b/BUILD.bazel index 138e416b10..e1a853d2db 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -54,6 +54,9 @@ COPTS = [ }) + select({ "//bazel/config:brpc_with_asan": ["-fsanitize=address"], "//conditions:default": [""], +}) + select({ + ":brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) + select({ "//bazel/config:brpc_with_no_pthread_mutex_hook": ["-DNO_PTHREAD_MUTEX_HOOK"], "//conditions:default": [""], @@ -232,6 +235,7 @@ BUTIL_SRCS = [ "src/butil/iobuf.cpp", "src/butil/single_iobuf.cpp", "src/butil/iobuf_profiler.cpp", + "src/butil/gpu/gpu_block_pool.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", @@ -337,6 +341,9 @@ cc_library( "-DUNIT_TEST", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), includes = [ "src/", @@ -356,6 +363,9 @@ cc_library( }) + select({ "//bazel/config:brpc_with_boringssl": ["@boringssl//:ssl", "@boringssl//:crypto"], "//conditions:default": ["@openssl//:ssl", "@openssl//:crypto"], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) @@ -573,6 +583,9 @@ cc_library( "@org_apache_thrift//:thrift", ], "//conditions:default": [], + }) + select({ + ":brpc_with_gdr": ["@local_config_cuda//cuda:cuda_headers"], + "//conditions:default": [], }), ) diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index d08ea2ec23..06376cf85c 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -104,6 +104,12 @@ config_setting( visibility = ["//visibility:public"], ) +config_setting( + name = "brpc_with_gdr", + define_values = {"BRPC_WITH_GDR": "true"}, + visibility = ["//visibility:public"], +) + config_setting( name = "brpc_with_boringssl", define_values = {"BRPC_WITH_BORINGSSL": "true"}, @@ -148,4 +154,4 @@ config_setting( name = "with_babylon_counter", define_values = {"with_babylon_counter": "true"}, visibility = ["//visibility:public"], -) \ No newline at end of file +) diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp index 616c1a3044..4487fc2df8 100644 --- a/src/brpc/acceptor.cpp +++ b/src/brpc/acceptor.cpp @@ -41,6 +41,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool) , _force_ssl(false) , _ssl_ctx(NULL) , _use_rdma(false) + , _use_gdr(false) , _bthread_tag(BTHREAD_TAG_DEFAULT) { } diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h index 69f632aaca..66f85c4904 100644 --- a/src/brpc/acceptor.h +++ b/src/brpc/acceptor.h @@ -113,6 +113,9 @@ friend class Server; // Whether to use rdma or not bool _use_rdma; + // Whether to use gdr or not + bool _use_gdr; + // Acceptor belongs to this tag bthread_tag_t _bthread_tag; }; diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 0252e97d74..bce554380b 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -61,6 +61,7 @@ ChannelOptions::ChannelOptions() , succeed_without_server(true) , log_succeed_without_server(true) , use_rdma(false) + , use_gdr(false) , auth(NULL) , backup_request_policy(NULL) , retry_policy(NULL) @@ -123,6 +124,9 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { if (opt.use_rdma) { buf.append("|rdma"); } + if (opt.use_gdr) { + buf.append("|gdr"); + } butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size()); buf.clear(); @@ -197,6 +201,11 @@ int Channel::InitChannelOptions(const ChannelOptions* options) { return -1; } rdma::GlobalRdmaInitializeOrDie(); +#if BRPC_WITH_GDR + if (_options.use_gdr) { + rdma::GlobalGdrInitializeOrDie(); + } +#endif // BRPC_WITH_GDR if (!rdma::InitPollingModeWithTag(bthread_self_tag())) { return -1; } @@ -369,7 +378,8 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, return -1; } if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), - &_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) { + &_server_id, ssl_ctx, _options.use_rdma, _options.use_gdr, + _options.hc_option) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -407,6 +417,7 @@ int Channel::Init(const char* ns_url, ns_opt.succeed_without_server = _options.succeed_without_server; ns_opt.log_succeed_without_server = _options.log_succeed_without_server; ns_opt.use_rdma = _options.use_rdma; + ns_opt.use_gdr = _options.use_gdr; ns_opt.channel_signature = ComputeChannelSignature(_options); ns_opt.hc_option = _options.hc_option; if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { diff --git a/src/brpc/channel.h b/src/brpc/channel.h index c970209b3a..160651882b 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -109,6 +109,10 @@ struct ChannelOptions { // Default: false bool use_rdma; + // Let this channel use gdu direct rdma. + // Default: false + bool use_gdr; + // Turn on authentication for this channel if `auth' is not NULL. // Note `auth' will not be deleted by channel and must remain valid when // the channel is being used. diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index 341ca35b09..5c62de9ad6 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -126,7 +126,8 @@ void NamingServiceThread::Actions::ResetServers( // to pick those Sockets with the right settings during OnAddedServers const SocketMapKey key(_added[i], _owner->_options.channel_signature); CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, - _owner->_options.use_rdma, _owner->_options.hc_option)); + _owner->_options.use_rdma, _owner->_options.use_gdr, + _owner->_options.hc_option)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index 1745e5f267..4f9c2b744e 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -45,11 +45,13 @@ struct GetNamingServiceThreadOptions { GetNamingServiceThreadOptions() : succeed_without_server(false) , log_succeed_without_server(true) - , use_rdma(false) {} + , use_rdma(false) + , use_gdr(false) {} bool succeed_without_server; bool log_succeed_without_server; bool use_rdma; + bool use_gdr; HealthCheckOption hc_option; ChannelSignature channel_signature; std::shared_ptr ssl_ctx; diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 5adf77b2c5..7a2c079c24 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -23,6 +23,7 @@ #include #include "butil/logging.h" // LOG() #include "butil/iobuf.h" // butil::IOBuf +#include "butil/gpu/gpu_block_pool.h" #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "butil/memory/scope_guard.h" #include "json2pb/json_to_pb.h" @@ -69,6 +70,10 @@ DECLARE_bool(pb_enum_as_number); // 5. Not supported: chunk_info // Pack header into `buf' + +const int header_size = 12; +const int prefetch_d2h_size = 512; + inline void PackRpcHeader(char* rpc_header, uint32_t meta_size, int payload_size) { uint32_t* dummy = (uint32_t*)rpc_header; // suppress strict-alias warning *dummy = *(uint32_t*)"PRPC"; @@ -101,44 +106,103 @@ static void SerializeRpcHeaderAndMeta( ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { + char header_buf[12]; - const size_t n = source->copy_to(header_buf, sizeof(header_buf)); - if (n >= 4) { - void* dummy = header_buf; - if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - } else { - if (memcmp(header_buf, "PRPC", n) != 0) { - return MakeParseError(PARSE_ERROR_TRY_OTHERS); - } - } - if (n < sizeof(header_buf)) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); - } + size_t n = 0; uint32_t body_size; uint32_t meta_size; - butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); - if (body_size > FLAGS_max_body_size) { - // We need this log to report the body_size to give users some clues - // which is not printed in InputMessenger. - LOG(ERROR) << "body_size=" << body_size << " from " - << socket->remote_side() << " is too large"; - return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); - } else if (source->length() < sizeof(header_buf) + body_size) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + ParseError pe = PARSE_OK; + +#if BRPC_WITH_GDR + void* prefetch_d2h_data = NULL; + uint32_t data_meta = source->get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + if (is_gpu_memory) { + prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); + if (prefetch_d2h_data == NULL) { + LOG(FATAL) << "alloc host data failed!!!"; + } + n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size); + size_t copy_size = n > 12 ? 12 : n; + memcpy(header_buf, prefetch_d2h_data, copy_size); + } else { + n = source->copy_to(header_buf, sizeof(header_buf)); } - if (meta_size > body_size) { - LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" - << body_size; - // Pop the message - source->pop_front(sizeof(header_buf) + body_size); - return MakeParseError(PARSE_ERROR_TRY_OTHERS); +#else + n = source->copy_to(header_buf, sizeof(header_buf)); +#endif // BRPC_WITH_GDR + + do { + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } else { + if (memcmp(header_buf, "PRPC", n) != 0) { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } + if (n < sizeof(header_buf)) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + pe = PARSE_ERROR_TOO_BIG_DATA; + break; + } else if (source->length() < sizeof(header_buf) + body_size) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } while (0); + + if (pe != PARSE_OK) { +#if BRPC_WITH_GDR + if (is_gpu_memory) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + } +#endif // BRPC_WITH_GDR + return MakeParseError(pe); } + source->pop_front(sizeof(header_buf)); MostCommonMessage* msg = MostCommonMessage::Get(); + +#if BRPC_WITH_GDR + if (is_gpu_memory) { + if (header_size + meta_size <= n) { + auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); }; + msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n); + source->pop_front(meta_size); + } else { + host_allocator->DeallocateRaw(prefetch_d2h_data); + source->cutn_from_gpu(&msg->meta, meta_size); + } + source->cutn(&msg->payload, body_size - meta_size); + } else { + source->cutn(&msg->meta, meta_size); + source->cutn(&msg->payload, body_size - meta_size); + } +#else source->cutn(&msg->meta, meta_size); source->cutn(&msg->payload, body_size - meta_size); +#endif // BRPC_WITH_GDR return MakeMessage(msg); } @@ -793,7 +857,29 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); +#if BRPC_WITH_GDR + int meta_size = msg->meta.size(); + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + req_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&req_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&req_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&req_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR if (meta.attachment_size() > 0) { cntl->request_attachment().swap(msg->payload); } @@ -963,8 +1049,14 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } // Parse response message iff error code from meta is 0 butil::IOBuf res_buf; + int meta_size = msg->meta.size(); const int res_size = msg->payload.length(); butil::IOBuf* res_buf_ptr = &msg->payload; + +#if BRPC_WITH_GDR + uint32_t data_meta = msg->payload.get_first_data_meta_high32(); + bool is_gpu_memory = (data_meta == static_cast(butil::IOBuf::GPU_MEMORY)); +#endif // BRPC_WITH_GDR if (meta.has_attachment_size()) { if (meta.attachment_size() > res_size) { cntl->SetFailed( @@ -973,9 +1065,44 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { break; } int body_without_attachment_size = res_size - meta.attachment_size(); + +#if BRPC_WITH_GDR + if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, body_without_attachment_size); + } + } + else { + msg->payload.cutn(&res_buf, body_without_attachment_size); + } +#else msg->payload.cutn(&res_buf, body_without_attachment_size); +#endif // BRPC_WITH_GDR res_buf_ptr = &res_buf; cntl->response_attachment().swap(msg->payload); +#if BRPC_WITH_GDR + } else if(is_gpu_memory) { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + res_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf.append((char*)data + meta_size, res_size); + msg->payload.pop_front(res_size); + } else { + msg->payload.cutn_from_gpu(&res_buf, res_size); + } + res_buf_ptr = &res_buf; +#endif // BRPC_WITH_GDR } ContentType content_type = meta.content_type(); diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 1d502a98f7..a8bb0a5e77 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -20,6 +20,7 @@ #include #include "butil/fd_utility.h" #include "butil/logging.h" // CHECK, LOG +#include "butil/gpu/gpu_block_pool.h" #include "butil/sys_byteorder.h" // HostToNet,NetToHost #include "bthread/bthread.h" #include "brpc/errno.pb.h" @@ -56,7 +57,7 @@ DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: " "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)"); DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once."); DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); -DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); +DEFINE_int32(rdma_prepared_qp_cnt, 256, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); @@ -65,6 +66,12 @@ DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode."); DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule"); DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA"); +namespace butil { + namespace gdr { + extern int gdr_block_size_kb; + } +} + static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent // DO NOT change this value unless you know the safe value!!! @@ -89,6 +96,7 @@ static uint16_t g_rdma_hello_msg_len = 40; // In Byte static uint16_t g_rdma_hello_version = 2; static uint16_t g_rdma_impl_version = 1; static uint32_t g_rdma_recv_block_size = 0; +static uint32_t g_gdr_recv_block_size = 0; // static const uint32_t MAX_INLINE_DATA = 64; static const uint8_t MAX_HOP_LIMIT = 16; @@ -168,8 +176,9 @@ RdmaResource::~RdmaResource() { } } -RdmaEndpoint::RdmaEndpoint(Socket* s) +RdmaEndpoint::RdmaEndpoint(Socket* s, bool use_gdr) : _socket(s) + , _use_gdr(use_gdr) , _state(UNINIT) , _resource(NULL) , _cq_events(0) @@ -439,6 +448,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.hello_ver = g_rdma_hello_version; local_msg.impl_ver = g_rdma_impl_version; local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.lid = GetRdmaLid(); @@ -648,7 +658,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } else { local_msg.lid = GetRdmaLid(); local_msg.gid = GetRdmaGid(); - local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.hello_ver = g_rdma_hello_version; @@ -944,8 +954,15 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid if (wc.byte_len > 0) { - if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { - zerocopy = false; +#if BRPC_WITH_GDR + if (_use_gdr) { + zerocopy = true; + } else +#endif // BRPC_WITH_GDR + { + if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { + zerocopy = false; + } } CHECK(_state != FALLBACK_TCP); if (zerocopy) { @@ -1017,26 +1034,74 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { return 0; } +int RdmaEndpoint::DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey) { + ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + ibv_sge sge; + sge.addr = (uint64_t)block; + sge.length = block_size; + sge.lkey = lkey; + wr.num_sge = 1; + wr.sg_list = &sge; + //LOG(INFO) << "POST recv: addr=0x" << std::hex << sge.addr + // << std::dec << " length=0x" << sge.length + // << " lkey=0x" << sge.lkey; + //LOG(INFO) << block << " " << _device_allocator->get_lkey(); + ibv_recv_wr* bad = NULL; + int err = ibv_post_recv(_resource->qp, &wr, &bad); + if (err != 0) { + LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err); + return -1; + } + return 0; +} + int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { // We do the post repeatedly from the _rbuf[_rq_received]. while (num > 0) { + uint32_t lkey = 0; if (zerocopy) { _rbuf[_rq_received].clear(); - butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], - g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); - int size = 0; - if (!os.Next(&_rbuf_data[_rq_received], &size)) { - // Memory is not enough for preparing a block - PLOG(WARNING) << "Fail to allocate rbuf"; + +#if BRPC_WITH_GDR + if (_use_gdr) { + butil::gdr::BlockPoolAllocator* device_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_allocator(); + void* device_ptr = device_allocator->AllocateRaw(g_gdr_recv_block_size); + auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; + lkey = device_allocator->get_lkey(device_ptr); + uint64_t data_meta = (static_cast(butil::IOBuf::GPU_MEMORY) << 32) | lkey; + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_gdr_recv_block_size, deleter , data_meta); + _rbuf_data[_rq_received] = device_ptr; + } else +#endif // if BRPC_WITH_GDR + { + butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], + g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); + int size = 0; + if (!os.Next(&_rbuf_data[_rq_received], &size)) { + // Memory is not enough for preparing a block + PLOG(WARNING) << "Fail to allocate rbuf"; + return -1; + } else { + CHECK(static_cast(size) == g_rdma_recv_block_size) << size; + } + } + } +#if BRPC_WITH_GDR + if (_use_gdr) { + if (DoPostRecvGDR(_rbuf_data[_rq_received], g_gdr_recv_block_size, lkey) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } + } else +#endif // if BRPC_WITH_GDR + { + if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { + _rbuf[_rq_received].clear(); return -1; - } else { - CHECK(static_cast(size) == g_rdma_recv_block_size) << size; } } - if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { - _rbuf[_rq_received].clear(); - return -1; - } + --num; ++_rq_received; if (_rq_received == _rq_size) { @@ -1209,7 +1274,11 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { } attr.qp_state = IBV_QPS_RTR; +#if BRPC_WITH_GDR + attr.path_mtu = IBV_MTU_4096; // TODO: detect mtu automatically +#else attr.path_mtu = IBV_MTU_1024; // TODO: support more mtu in future +#endif // if BRPC_WITH_GDR attr.ah_attr.grh.dgid = gid; attr.ah_attr.grh.flow_label = 0; attr.ah_attr.grh.sgid_index = GetRdmaGidIndex(); @@ -1503,6 +1572,13 @@ void RdmaEndpoint::DebugInfo(std::ostream& os) const { << "\n"; } +int RdmaEndpoint::GlobalGdrInitialize() { +#if BRPC_WITH_GDR + LOG(INFO) << "gdr_block_size_kb: " << butil::gdr::gdr_block_size_kb; + g_gdr_recv_block_size = butil::gdr::gdr_block_size_kb * 1024 - IOBUF_BLOCK_HEADER_LEN; +#endif // BRPC_WITH_GDR + return 0; +} int RdmaEndpoint::GlobalInitialize() { if (FLAGS_rdma_recv_block_type == "default") { g_rdma_recv_block_size = GetBlockSize(0) - IOBUF_BLOCK_HEADER_LEN; @@ -1515,6 +1591,14 @@ int RdmaEndpoint::GlobalInitialize() { return -1; } + LOG(INFO) << "rdma_use_polling :" << FLAGS_rdma_use_polling + << ", rdma_poller_num : " << FLAGS_rdma_poller_num + << ", rdma_poller_yield : " << FLAGS_rdma_poller_yield + << ", rdma_sq_size: " << FLAGS_rdma_sq_size + << ", rdma_rq_size: " << FLAGS_rdma_rq_size + << ", rdma_zerocopy_min_size: " << FLAGS_rdma_zerocopy_min_size + << ", g_rdma_recv_block_size: " << g_rdma_recv_block_size; + g_rdma_resource_mutex = new butil::Mutex; for (int i = 0; i < FLAGS_rdma_prepared_qp_cnt; ++i) { RdmaResource* res = AllocateQpCq(FLAGS_rdma_prepared_qp_size, diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index de7cd5f6d8..a258abfb74 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -31,7 +31,6 @@ #include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" - namespace brpc { class Socket; namespace rdma { @@ -72,18 +71,23 @@ class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser { friend class RdmaConnect; friend class brpc::Socket; public: - RdmaEndpoint(Socket* s); + RdmaEndpoint(Socket* s, bool use_gdr); ~RdmaEndpoint(); // Global initialization // Return 0 if success, -1 if failed and errno set static int GlobalInitialize(); + // Global initialization for gdr + static int GlobalGdrInitialize(); + static void GlobalRelease(); // Reset the endpoint (for next use) void Reset(); + bool use_gdr() { return _use_gdr; } + // Cut data from the given IOBuf list and use RDMA to send // Return bytes cut if success, -1 if failed and errno set ssize_t CutFromIOBufList(butil::IOBuf** data, size_t ndata); @@ -173,6 +177,8 @@ friend class brpc::Socket; // -1: failed, errno set int DoPostRecv(void* block, size_t block_size); + + int DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey); // Read at most len bytes from fd in _socket to data // wait for _read_butex if encounter EAGAIN // return -1 if encounter other errno (including EOF) @@ -293,6 +299,8 @@ friend class brpc::Socket; std::atomic running; }; static std::vector _poller_groups; + + bool _use_gdr; }; } // namespace rdma diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 9bad33750c..cbd7889084 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -25,6 +25,9 @@ #include "butil/containers/flat_map.h" // butil::FlatMap #include "butil/fd_guard.h" #include "butil/fd_utility.h" // butil::make_non_blocking +#if BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif // BRPC_WITH_GDR #include "butil/logging.h" #include "brpc/socket.h" #include "brpc/rdma/block_pool.h" @@ -84,6 +87,8 @@ static uint16_t g_lid; static int g_max_sge = 0; static uint8_t g_port_num = 1; +static int g_gpu_index = 0; + static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); @@ -93,7 +98,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); - +DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; @@ -477,6 +482,7 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } + g_gpu_index = FLAGS_gpu_index; // Find the first active port g_port_num = FLAGS_rdma_port; int available_devices; @@ -580,7 +586,21 @@ static void GlobalRdmaInitializeOrDieImpl() { g_rdma_available.store(true, butil::memory_order_relaxed); } +static void GlobalGdrInitializeOrDieImpl() { +#if BRPC_WITH_GDR + if (!butil::gdr::InitGPUBlockPool(g_gpu_index, GetRdmaPd())) { + PLOG(ERROR) << "Fail to initialize RDMA GPU memory pool"; + ExitWithError(); + } + if (RdmaEndpoint::GlobalGdrInitialize() < 0) { + LOG(ERROR) << "g_gdr_recv_block_size incorrect."; + ExitWithError(); + } +#endif // if BRPC_WITH_GDR +} + static pthread_once_t initialize_rdma_once = PTHREAD_ONCE_INIT; +static pthread_once_t initialize_gdr_once = PTHREAD_ONCE_INIT; void GlobalRdmaInitializeOrDie() { if (pthread_once(&initialize_rdma_once, @@ -590,6 +610,14 @@ void GlobalRdmaInitializeOrDie() { } } +void GlobalGdrInitializeOrDie() { + if (pthread_once(&initialize_gdr_once, + GlobalGdrInitializeOrDieImpl) != 0) { + LOG(FATAL) << "Fail to pthread_once GlobalGdrInitializeOrDie"; + exit(1); + } +} + uint32_t RegisterMemoryForRdma(void* buf, size_t len) { ibv_mr* mr = IbvRegMr(g_pd, buf, len, IBV_ACCESS_LOCAL_WRITE); if (!mr) { @@ -679,6 +707,11 @@ uint8_t GetRdmaPortNum() { return g_port_num; } +int GetGPUIndex() { + return g_gpu_index; +} + + bool IsRdmaAvailable() { return g_rdma_available.load(butil::memory_order_acquire); } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..06cbb1f5c2 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -33,6 +33,10 @@ namespace rdma { // Exit if failed void GlobalRdmaInitializeOrDie(); +// Initialize GDR environment +// Exit if failed +void GlobalGdrInitializeOrDie(); + // Initialize RDMA polling mode with tag bool InitPollingModeWithTag(bthread_tag_t tag, std::function callback = nullptr, @@ -74,6 +78,9 @@ int GetRdmaCompVector(); // Return current port number used uint8_t GetRdmaPortNum(); +// Get GPU index used +int GetGPUIndex(); + // Get max_sge supported by the device int GetRdmaMaxSge(); diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index cd83053a42..38bf808e48 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -146,6 +146,7 @@ ServerOptions::ServerOptions() , has_builtin_services(true) , force_ssl(false) , use_rdma(false) + , use_gdr(false) , baidu_master_service(NULL) , http_master_service(NULL) , health_reporter(NULL) @@ -895,6 +896,11 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } rdma::GlobalRdmaInitializeOrDie(); +#if BRPC_WITH_GDR + if (_options.use_gdr) { + rdma::GlobalGdrInitializeOrDie(); + } +#endif // BRPC_WITH_GDR if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) { return -1; } @@ -1170,6 +1176,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } _am->_use_rdma = _options.use_rdma; + _am->_use_gdr = _options.use_gdr; _am->_bthread_tag = _options.bthread_tag; } // Set `_status' to RUNNING before accepting connections diff --git a/src/brpc/server.h b/src/brpc/server.h index 2cf34dbd82..f720956bb3 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -227,6 +227,10 @@ struct ServerOptions { // Default: false bool use_rdma; + // Whether the server uses gdr or not + // Default: false + bool use_gdr; + // [CAUTION] This option is for implementing specialized baidu-std proxies, // most users don't need it. Don't change this option unless you fully // understand the description below. diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 73ea309a71..e6938b332a 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -759,7 +759,7 @@ int Socket::OnCreated(const SocketOptions& options) { #if BRPC_WITH_RDMA CHECK(_rdma_ep == NULL); if (options.use_rdma) { - _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this); + _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this, options.use_gdr); if (!_rdma_ep) { const int saved_errno = errno; PLOG(ERROR) << "Fail to create RdmaEndpoint"; @@ -2811,6 +2811,7 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) { opt.keytable_pool = _keytable_pool; opt.app_connect = _app_connect; opt.use_rdma = (_rdma_ep) ? true : false; + opt.use_gdr = (_rdma_ep) ? _rdma_ep->use_gdr() : false; socket_pool = new SocketPool(opt); SocketPool* expected = NULL; if (!main_sp->socket_pool.compare_exchange_strong( @@ -2912,6 +2913,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) { opt.keytable_pool = _keytable_pool; opt.app_connect = _app_connect; opt.use_rdma = (_rdma_ep) ? true : false; + opt.use_gdr = (_rdma_ep) ? _rdma_ep->use_gdr() : false; if (get_client_side_messenger()->Create(opt, &id) != 0 || Socket::Address(id, short_socket) != 0) { return -1; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 03ad43f867..4dee716fd8 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -271,6 +271,7 @@ struct SocketOptions { bool force_ssl{false}; std::shared_ptr initial_ssl_ctx; bool use_rdma{false}; + bool use_gdr{false}; bthread_keytable_pool_t* keytable_pool{NULL}; SocketConnection* conn{NULL}; std::shared_ptr app_connect; diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index c5c94bc747..2397f62819 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -92,8 +92,9 @@ SocketMap* get_or_new_client_side_socket_map() { int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, + bool use_gdr, const HealthCheckOption& hc_option) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option); + return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, use_gdr, hc_option); } int SocketMapFind(const SocketMapKey& key, SocketId* id) { @@ -227,6 +228,7 @@ void SocketMap::ShowSocketMapInBvarIfNeed() { int SocketMap::Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, + bool use_gdr, const HealthCheckOption& hc_option) { ShowSocketMapInBvarIfNeed(); @@ -251,6 +253,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, opt.remote_side = key.peer.addr; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; + opt.use_gdr = use_gdr; opt.hc_option = hc_option; if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) { PLOG(FATAL) << "Fail to create socket to " << key.peer; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index b0d542e78e..698bdab7ed 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -82,18 +82,19 @@ struct SocketMapKeyHasher { int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, + bool use_gdr, const HealthCheckOption& hc_option); inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return SocketMapInsert(key, id, ssl_ctx, false, hc_option); + return SocketMapInsert(key, id, ssl_ctx, false, false, hc_option); } inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return SocketMapInsert(key, id, empty_ptr, false, hc_option); + return SocketMapInsert(key, id, empty_ptr, false, false, hc_option); } // Find the SocketId associated with `key'. @@ -155,17 +156,18 @@ class SocketMap { int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, + bool use_gdr, const HealthCheckOption& hc_option); int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return Insert(key, id, ssl_ctx, false, hc_option); + return Insert(key, id, ssl_ctx, false, false, hc_option); } int Insert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return Insert(key, id, empty_ptr, false, hc_option); + return Insert(key, id, empty_ptr, false, false, hc_option); } void Remove(const SocketMapKey& key, SocketId expected_id); diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp new file mode 100644 index 0000000000..b768e408e8 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include +#include +#include "butil/fast_rand.h" +#include "gpu_block_pool.h" +namespace butil { +namespace gdr { + +#define CHECK_CUDA(call) \ +do { \ + auto _sts = (call); \ + if (_sts != cudaSuccess) { \ + LOG(FATAL) << " cuda error:" \ + << (cudaGetErrorString(_sts)) << std::string(" at ") \ + << __FILE__ << ": " << __LINE__; \ + } \ +} while (0); + +bool verify_same_context() { + static int original_device = -1; + static bool first_call = true; + + int current_device; + cudaGetDevice(¤t_device); + + if (first_call) { + original_device = current_device; + first_call = false; + return true; + } + + return (current_device == original_device); +} + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + void *d_data; + + LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " << gpu_id; + + CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size)); + cudaDeviceSynchronize(); + return (void *)d_data; +} + +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + + LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " << gpu_id << "on host"; + + void* mem = NULL; + + CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size)); + + cudaDeviceSynchronize(); + + return mem; +} + + +BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr; + +BlockPoolAllocators* BlockPoolAllocators::singleton() { + static std::mutex mutex; + if (instance_ == nullptr) { + std::lock_guard l(mutex); + if (instance_ == nullptr) { + instance_ = new BlockPoolAllocators(); + std::atomic_thread_fence(std::memory_order_release); + } + } + std::atomic_thread_fence(std::memory_order_acquire); + return instance_; +} + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd) { + BlockPoolAllocators::singleton()->init(gpu_id, pd); + return true; +} + +class BlockHeaderList { + public: + BlockHeaderList() { + objects_.reserve(kMaxObjects); + } + virtual ~BlockHeaderList() { + for (size_t i = 0; i < objects_.size(); i++) { + delete objects_[i]; + } + } + + BlockHeader* New() { + { + std::lock_guard lock(mu_); + if (!objects_.empty()) { + BlockHeader* result = objects_.back(); + objects_.pop_back(); + return result; + } + } + return new BlockHeader; + } + void Release(BlockHeader* obj) { + obj->Reset(); + { + std::lock_guard lock(mu_); + if (objects_.size() < kMaxObjects) { + objects_.push_back(obj); + return; + } + } + delete obj; + } + + private: + static const int kMaxObjects = 100000; + + std::mutex mu_; + std::vector objects_; +}; + +static BlockHeaderList* get_bh_list() { + static BlockHeaderList* bh_list = new BlockHeaderList(); + return bh_list; +} + + +BlockPoolAllocator::BlockPoolAllocator(int gpuId, bool onGpu, ibv_pd* brpc_pd, + size_t blockSize, size_t regionSize) : + gpu_id(gpuId) + , on_gpu(onGpu) + , pd(brpc_pd) + , BLOCK_SIZE(std::max(blockSize, sizeof(BlockHeader))) + , REGION_SIZE((regionSize / blockSize) * blockSize) // 对齐到块大小的倍数 + , freeList(nullptr) + , g_region_num(0) + , totalAllocated(0) + , totalDeallocated(0) + , peakUsage(0) { + LOG(INFO) << "Memory Pool initialized: block_size=" << BLOCK_SIZE + << ", region_size=" << REGION_SIZE + << ", gpu_id=" << gpu_id << ", on_gpu=" << on_gpu << ", pd=" << pd; + + extendRegion(); +} + +BlockPoolAllocator::~BlockPoolAllocator() { +#ifdef DEBUG + printStatistics(); +#endif + + for (int i = 0; i < max_regions; i++) { + Region* r = &g_regions[i]; + if (!r->mr) { + return; + } + + LOG(INFO) << "try to free " << r->size << " bytes from gpu " << gpu_id << ", on_gpu " << on_gpu; + ibv_dereg_mr(r->mr); + if (on_gpu) { + CHECK_CUDA(cudaFree(reinterpret_cast(r->start))); + } else { + CHECK_CUDA(cudaFreeHost(reinterpret_cast(r->start))); + } + } +} + +Region* BlockPoolAllocator::GetRegion(const void* buf) { + if (!buf) { + errno = EINVAL; + return NULL; + } + Region* r = NULL; + uintptr_t addr = (uintptr_t)buf; + for (int i = 0; i < max_regions; ++i) { + if (g_regions[i].aligned_start == 0) { + break; + } + if (addr >= g_regions[i].aligned_start && + addr < g_regions[i].aligned_start + g_regions[i].aligned_size) { + r = &g_regions[i]; + break; + } + } + return r; +} + +uint32_t BlockPoolAllocator::get_lkey(const void* buf) { + Region* r = GetRegion(buf); + if (!r) { + LOG(ERROR) << "can not get a region for buf " << buf; + return 0; + } + return r->lkey; +} + +void* BlockPoolAllocator::AllocateRaw(size_t num_bytes) { + if (num_bytes == 0) { + return nullptr; + } + if (num_bytes > BLOCK_SIZE) { + LOG(FATAL) << "try to alloc " << num_bytes << " bytes, its bigger than block_size " << BLOCK_SIZE; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(poolMutex); + + if (!freeList) { + extendRegion(); + } + + BlockHeader* block = freeList; + freeList = freeList->next; + + void* addr = block->addr; + get_bh_list()->Release(block); + + totalAllocated++; + peakUsage = std::max(peakUsage, totalAllocated - totalDeallocated); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + +#ifdef DEBUG + if (duration.count() > 1000) { // 如果分配时间超过1微秒 + LOG(INFO) << "Slow allocation: " << duration.count() << " ns"; + } +#endif + + return addr; +} + +void BlockPoolAllocator::DeallocateRaw(void* ptr) { + if (!ptr) return; + + std::lock_guard lock(poolMutex); + + BlockHeader* block = get_bh_list()->New(); + block->addr = ptr; + block->next = freeList; + freeList = block; + + totalDeallocated++; +} + +// 获取统计信息 +void BlockPoolAllocator::printStatistics() const { + LOG(INFO) << "=== Memory Pool Statistics ==="; + LOG(INFO) << "Total regions: " << g_region_num + << ", Total blocks allocated: " << totalAllocated + << ", Total blocks deallocated: " << totalDeallocated + << ", Current usage: " << (totalAllocated - totalDeallocated) << " blocks" + << ", Peak usage: " << peakUsage << " blocks" + << ", Memory efficiency: " + << (static_cast(totalAllocated - totalDeallocated) / + (g_region_num * (REGION_SIZE / BLOCK_SIZE)) * 100) + << "%"; +} + +void BlockPoolAllocator::extendRegion() { + if (g_region_num == max_regions) { + LOG(FATAL) << "Gdr Memory pool reaches max regions"; + return ; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + void* ptr = nullptr; + void* aligned_ptr = nullptr; + int alignment = 4096; + + if (on_gpu) { + ptr = get_gpu_mem(gpu_id, REGION_SIZE); + } else { + ptr = get_cpu_mem(gpu_id, REGION_SIZE); + } + + aligned_ptr = (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1)); + + int64_t aligned_bytes = REGION_SIZE; + if (ptr != aligned_ptr) { + uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; + uintptr_t aligned_end_ptr = region_end & ~(alignment - 1); + aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); + LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes + << ", region_size: " << REGION_SIZE; + } + + LOG(INFO) << "reg_mr for ptr: " << aligned_ptr << ", size:" << aligned_bytes; + auto mr = ibv_reg_mr(pd, aligned_ptr, aligned_bytes, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + //IBV_ACCESS_RELAXED_ORDERING); + + if (!mr) { + LOG(FATAL) << "Failed to register MR: " << strerror(errno) + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } else { + LOG(INFO) << "Success to register MR: " + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } + + LOG(INFO) << "try to init region, g_region_num:" << g_region_num; + size_t blockCount = aligned_bytes / BLOCK_SIZE; + Region* region = &g_regions[g_region_num++]; + region->start = (uintptr_t)ptr; + region->aligned_start = (uintptr_t)aligned_ptr; + region->mr = mr; + region->size = REGION_SIZE; + region->aligned_size = aligned_bytes; + region->lkey = mr->lkey; + region->blockCount = blockCount; + + + LOG(INFO) << "try to insert list, freeList:" << freeList << ", blockCount:" << blockCount; + BlockHeader* lastBlock = nullptr; + for (size_t i = 0; i < blockCount; ++i) { + BlockHeader* block = get_bh_list()->New(); + block->addr = reinterpret_cast(static_cast(aligned_ptr) + i * BLOCK_SIZE); + if (lastBlock != nullptr) { + lastBlock->next = block; + } else { + freeList = block; + } + lastBlock = block; + } + + if (lastBlock) { + lastBlock->next = nullptr; + } + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + LOG(INFO) << "Extended region #" << g_region_num << ": " << blockCount + << " blocks (" << (REGION_SIZE / (1024 * 1024)) << " MB)" << ", on_gpu " << on_gpu + << ", cost " << duration.count() << " ns"; +} + +GPUStreamPool::GPUStreamPool(int gpu_id) : + gpu_id_(gpu_id) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + d2d_streams_.resize(kMaxConcurrent); + d2h_streams_.resize(kMaxConcurrent); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamCreate(&d2d_streams_[i])); + CHECK_CUDA(cudaStreamCreate(&d2h_streams_[i])); + } + CHECK_CUDA(cudaDeviceSynchronize()); +} + +GPUStreamPool::~GPUStreamPool() { + CHECK_CUDA(cudaDeviceSynchronize()); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamDestroy(d2d_streams_[i])); + CHECK_CUDA(cudaStreamDestroy(d2h_streams_[i])); + } +} + +void GPUStreamPool::fast_d2d(std::vector& src_list, + std::vector& length_list, + void* dst) { +#ifdef DEBUG + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } +#endif + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2d_lb_lock_); + d2d_cnt_.fetch_add(1); + stream_idx = d2d_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2d_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToDevice, d2d_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); +} + +void GPUStreamPool::fast_d2h(std::vector& src_list, + std::vector& length_list, + void* dst) { + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2h_lb_lock_); + d2h_cnt_.fetch_add(1); + stream_idx = d2h_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2h_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToHost, d2h_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); +} + +} +} + +#endif // BRPC_WITH_GDR diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h new file mode 100644 index 0000000000..6106952c76 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.h @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H +#define BUTIL_GPU_GPU_BLOCK_POOL_H + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include +#include +#include +#include +#include "butil/containers/hash_tables.h" +#include "butil/logging.h" +#include +#include "cuda.h" + +// #include "gdrapi.h" +namespace butil { +namespace gdr { + +static int gdr_block_size_kb = [](){ + int ret = 64; + const char* env_var_val = getenv("GDR_BLOCK_SIZE_KB"); + if (env_var_val == nullptr) { + return ret; + } + ret = std::stoi(env_var_val); + + return ret; +}(); + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size); +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size); + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd); + +struct Region { + Region() { start = 0; aligned_start = 0;} + uintptr_t start; + uintptr_t aligned_start; + + size_t size; + size_t aligned_size; + size_t blockCount; + struct ibv_mr *mr {nullptr}; + uint32_t lkey; +}; + +struct BlockHeader { + BlockHeader() { addr = nullptr; next = nullptr;} + void Reset() { addr = nullptr; next = nullptr; } + void* addr; + BlockHeader* next; +}; + +class BlockPoolAllocator { + private: + int gpu_id; + bool on_gpu; + ibv_pd* pd {nullptr}; + + const size_t BLOCK_SIZE; + const size_t REGION_SIZE; + + BlockHeader* freeList; + static constexpr size_t max_regions = 16; + int g_region_num {0}; + Region g_regions[max_regions]; + std::mutex poolMutex; + + // 统计信息 + size_t totalAllocated; + size_t totalDeallocated; + size_t peakUsage; + + public: + explicit BlockPoolAllocator(int gpu_id, + bool on_gpu, ibv_pd* pd, + size_t blockSize, size_t regionSize); + + ~BlockPoolAllocator(); + + void* AllocateRaw(size_t num_bytes); + + void DeallocateRaw(void* ptr); + + // 获取统计信息 + void printStatistics() const; + + int64_t getCurrentUsage() const { + return totalAllocated - totalDeallocated; + } + + int64_t getTotalMemory() const { + return g_region_num * REGION_SIZE; + } + + int64_t get_block_size() const { + return BLOCK_SIZE; + } + + uint32_t get_lkey(const void* buf); + + private: + Region* GetRegion(const void* buf); + void extendRegion(); +}; + +class GPUStreamPool { +public: + explicit GPUStreamPool(int gpu_id); + + ~GPUStreamPool(); + + GPUStreamPool(const GPUStreamPool&) = delete; + GPUStreamPool& operator=(const GPUStreamPool&) = delete; + + void fast_d2h(std::vector& src_list, std::vector& length_list, void* dst); + + void fast_d2d(std::vector& src_list, std::vector& length_list, void* dst); + + static constexpr int kMaxConcurrent = 32; +private: + int gpu_id_ {-1}; + std::atomic d2h_cnt_ {0}; + std::atomic d2d_cnt_ {0}; + std::mutex d2h_locks_[kMaxConcurrent]; + std::mutex d2d_locks_[kMaxConcurrent]; + std::mutex d2h_lb_lock_; + std::mutex d2d_lb_lock_; + std::vector d2h_streams_; + std::vector d2d_streams_; +}; + +class BlockPoolAllocators { +public: + static BlockPoolAllocators* singleton(); + BlockPoolAllocators() {} + virtual ~BlockPoolAllocators() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + } + + void init(int gpu_id, ibv_pd* pd) { + LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id; + size_t region_size = 512LL * 1024 * 1024; + size_t block_size = gdr_block_size_kb * 1024; + gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size, region_size); + + region_size = 32LL * 1024 * 1024; + block_size = 512; + cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size, region_size); + + gpu_stream_pool = new GPUStreamPool(gpu_id); + } + + BlockPoolAllocator* get_gpu_allocator() { + return gpu_mem_alloc; + } + + BlockPoolAllocator* get_cpu_allocator() { + return cpu_mem_alloc; + } + + GPUStreamPool* get_gpu_stream_pool() { + return gpu_stream_pool; + } + +public: + static BlockPoolAllocators* instance_; + +private: + BlockPoolAllocator* gpu_mem_alloc {nullptr}; + BlockPoolAllocator* cpu_mem_alloc {nullptr}; + GPUStreamPool* gpu_stream_pool {nullptr}; +}; +} +} + +#endif // BRPC_WITH_GDR + +#endif diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 26046e3745..ce3c0cc0bb 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -40,6 +40,7 @@ #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" #include "butil/iobuf_profiler.h" +#include "butil/gpu/gpu_block_pool.h" namespace butil { namespace iobuf { @@ -722,6 +723,46 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { return saved_n; } +#if BRPC_WITH_GDR +size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + bool alloc_from_host_alloc = (n <= host_allocator->get_block_size()); + void* mem = NULL; + if (alloc_from_host_alloc) { + mem = host_allocator->AllocateRaw(n); + } else { + mem = malloc(n); + } + + if (mem == NULL) { + return 0; + } + size_t saved_n = copy_from_gpu(mem, n, 0, false); + if (saved_n > 0) { + if (alloc_from_host_alloc) { + auto deleter = [host_allocator](void* data) { host_allocator->DeallocateRaw(data); }; + out->append_user_data(mem, saved_n, deleter); + } else { + auto deleter = [](void* data) { free(data); }; + out->append_user_data(mem, saved_n, deleter); + } + pop_front(saved_n); + } else { + if (alloc_from_host_alloc) { + host_allocator->DeallocateRaw(mem); + } else { + free(mem); + } + } + + return saved_n; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::cutn(void* out, size_t n) { const size_t len = length(); if (n > len) { @@ -1152,9 +1193,32 @@ uint64_t IOBuf::get_first_data_meta() { if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { return 0; } - return r.block->u.data_meta; + return (r.block->u.data_meta & 0x00000000FFFFFFFF); +} + +// only when user use append_user_data_with_meta(), lkey is stored in data_meta +// We add this function for GDR, we want to know whether the data is in Host memory or GPU memory +// since lkey is uint32_t type, thus we use the high 32 bit to store +uint32_t IOBuf::get_first_data_meta_high32() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { + return 0; + } + return (uint32_t)(r.block->u.data_meta >> 32); +} + +void* IOBuf::get_first_data_ptr() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + return r.block->data; } + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1317,6 +1381,46 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { return n - m; } +#if BRPC_WITH_GDR +size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { + if (n == 0) { + return 0; + } + const size_t nref = _ref_num(); + // Skip `pos' bytes. `offset' is the starting position in starting BlockRef. + size_t offset = pos; + size_t i = 0; + for (; offset != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + if (offset < (size_t)r.length) { + break; + } + offset -= r.length; + } + + butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); + size_t m = n; + std::vector src_list; + std::vector length_list; + for (; m != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + const size_t nc = std::min(m, (size_t)r.length - offset); + void* gpu_src = r.block->data + r.offset + offset; + src_list.push_back(gpu_src); + length_list.push_back(nc); + offset = 0; + m -= nc; + } + if (to_gpu) { + gpu_stream_pool->fast_d2d(src_list, length_list, d); + } else { + gpu_stream_pool->fast_d2h(src_list, length_list, d); + } + // If nref == 0, here returns 0 correctly + return n - m; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); if (len <= pos) { @@ -2102,4 +2206,4 @@ bool IOBufBytesIterator::forward_one_block(const void** data, size_t* size) { void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) { return butil::iobuf::cp(dest, src, n); -} // namespace butil \ No newline at end of file +} // namespace butil diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 239e82d950..14077f0c29 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -70,6 +70,11 @@ friend class SingleIOBuf; static const size_t DEFAULT_BLOCK_SIZE = 8192; static const size_t INITIAL_CAP = 32; // must be power of 2 + enum MemoryMeta { + HOST_MEMORY = 0, + GPU_MEMORY = 1 + }; + struct Block; // can't directly use `struct iovec' here because we also need to access the @@ -141,6 +146,12 @@ friend class SingleIOBuf; size_t cutn(IOBuf* out, size_t n); size_t cutn(void* out, size_t n); size_t cutn(std::string* out, size_t n); + +#if BRPC_WITH_GDR + size_t cutn_from_gpu(IOBuf* out, size_t n); + size_t copy_from_gpu(void* d, size_t n, size_t pos = 0, bool to_gpu = false) const; +#endif // BRPC_WITH_GDR + // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. bool cut1(void* c); @@ -260,6 +271,12 @@ friend class SingleIOBuf; // 0 means the meta is invalid. uint64_t get_first_data_meta(); + // Get the high 32 bits of the data meta of the first byte in this IOBuf. + // The meta is specified with append_user_data_with_meta before. + // we use 0 to specify host memory, 1 to specify GPU memory + uint32_t get_first_data_meta_high32(); + void* get_first_data_ptr(); + // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be // truncated. @@ -775,4 +792,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) { #include "butil/iobuf_inl.h" -#endif // BUTIL_IOBUF_H \ No newline at end of file +#endif // BUTIL_IOBUF_H