Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "rpc.h"
- #include <cactus/rpc/rpc.pb.h>
- #include <google/protobuf/message.h>
- #include <array>
- namespace cactus {
- void SimpleRpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
- const google::protobuf::Message &request,
- google::protobuf::Message *response) {
- auto conn = DialTCP(server_);
- // write
- RequestHeader header;
- header.set_service(method->service()->full_name());
- header.set_method(method->name());
- std::string header_bytes;
- header.SerializeToString(&header_bytes);
- std::string body_bytes;
- request.SerializeToString(&body_bytes);
- std::array<uint32_t, 2> sizes{
- static_cast<uint32_t>(header_bytes.size()),
- static_cast<uint32_t>(body_bytes.size())};
- conn->Write(View(sizes));
- conn->Write(View(header_bytes));
- conn->Write(View(body_bytes));
- // read
- std::array<uint32_t, 2> rsp_sizes;
- conn->ReadFull(View(rsp_sizes));
- auto rsp_header_size = rsp_sizes[0];
- auto rsp_body_size = rsp_sizes[1];
- std::string rsp_header_bytes(rsp_header_size, '\0');
- conn->ReadFull(View(rsp_header_bytes));
- ResponseHeader rsp_header;
- if (!rsp_header.ParseFromString(rsp_header_bytes)) {
- throw std::runtime_error("Invalid header");
- }
- if (rsp_header.has_rpc_error()) {
- throw RpcCallError(rsp_header.rpc_error().message());
- }
- std::string rsp_body_bytes(rsp_body_size, '\0');
- conn->ReadFull(View(rsp_body_bytes));
- if (!response->ParseFromString(rsp_body_bytes)) {
- throw std::runtime_error("Invalid body");
- }
- conn->Close();
- }
- void SimpleRpcServer::Serve() {
- while (true) {
- auto conn = lsn_->Accept();
- //group_.Spawn([&]{
- // read
- std::array<uint32_t, 2> sizes;
- conn->ReadFull(View(sizes));
- auto header_size = sizes[0];
- auto body_size = sizes[1];
- std::string header_bytes(header_size, '\0');
- conn->ReadFull(View(header_bytes));
- RequestHeader header;
- if (!header.ParseFromString(header_bytes)) {
- throw std::runtime_error("Invalid header");
- }
- auto service_iter = services_.find(header.service());
- if (service_iter == services_.end()) {
- throw std::runtime_error("Invalid service");
- }
- auto service = service_iter->second;
- auto method = service->ServiceDescriptor()->FindMethodByName(header.method());
- if (method == nullptr) {
- throw std::runtime_error("Invalid method");
- }
- std::string body_bytes(body_size, '\0');
- conn->ReadFull(View(body_bytes));
- std::unique_ptr<google::protobuf::Message> body(
- google::protobuf::MessageFactory::generated_factory()
- ->GetPrototype(method->input_type())->New());
- if (!body->ParseFromString(body_bytes)) {
- throw std::runtime_error("Invalid body");
- }
- // call
- ResponseHeader rsp_header;
- std::unique_ptr<google::protobuf::Message> rsp_body(
- google::protobuf::MessageFactory::generated_factory()
- ->GetPrototype(method->output_type())->New());
- try {
- service->CallMethod(method, *body, rsp_body.get());
- } catch (const std::runtime_error& err) {
- rsp_header.mutable_rpc_error()->set_message(err.what());
- }
- // write
- std::string rsp_header_bytes;
- rsp_header.SerializeToString(&rsp_header_bytes);
- std::string rsp_body_bytes;
- rsp_body->SerializeToString(&rsp_body_bytes);
- std::array<uint32_t, 2> rsp_sizes{
- static_cast<uint32_t>(rsp_header_bytes.size()),
- static_cast<uint32_t>(rsp_body_bytes.size())};
- conn->Write(View(rsp_sizes));
- conn->Write(View(rsp_header_bytes));
- conn->Write(View(rsp_body_bytes));
- conn->Close();
- //});
- }
- }
- } // namespace cactus
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement