Advertisement
Guest User

Untitled

a guest
Apr 19th, 2019
145
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.24 KB | None | 0 0
  1. #include "rpc.h"
  2.  
  3. #include <cactus/rpc/rpc.pb.h>
  4.  
  5. #include <google/protobuf/message.h>
  6.  
  7. #include <array>
  8.  
  9. namespace cactus {
  10.  
  11. void SimpleRpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
  12. const google::protobuf::Message &request,
  13. google::protobuf::Message *response) {
  14. auto conn = DialTCP(server_);
  15.  
  16. // write
  17.  
  18. RequestHeader header;
  19. header.set_service(method->service()->full_name());
  20. header.set_method(method->name());
  21. std::string header_bytes;
  22. header.SerializeToString(&header_bytes);
  23.  
  24. std::string body_bytes;
  25. request.SerializeToString(&body_bytes);
  26.  
  27. std::array<uint32_t, 2> sizes{
  28. static_cast<uint32_t>(header_bytes.size()),
  29. static_cast<uint32_t>(body_bytes.size())};
  30.  
  31. conn->Write(View(sizes));
  32. conn->Write(View(header_bytes));
  33. conn->Write(View(body_bytes));
  34.  
  35. // read
  36.  
  37. std::array<uint32_t, 2> rsp_sizes;
  38. conn->ReadFull(View(rsp_sizes));
  39. auto rsp_header_size = rsp_sizes[0];
  40. auto rsp_body_size = rsp_sizes[1];
  41.  
  42. std::string rsp_header_bytes(rsp_header_size, '\0');
  43. conn->ReadFull(View(rsp_header_bytes));
  44. ResponseHeader rsp_header;
  45. if (!rsp_header.ParseFromString(rsp_header_bytes)) {
  46. throw std::runtime_error("Invalid header");
  47. }
  48. if (rsp_header.has_rpc_error()) {
  49. throw RpcCallError(rsp_header.rpc_error().message());
  50. }
  51.  
  52. std::string rsp_body_bytes(rsp_body_size, '\0');
  53. conn->ReadFull(View(rsp_body_bytes));
  54. if (!response->ParseFromString(rsp_body_bytes)) {
  55. throw std::runtime_error("Invalid body");
  56. }
  57.  
  58. conn->Close();
  59. }
  60.  
  61. void SimpleRpcServer::Serve() {
  62. while (true) {
  63. auto conn = lsn_->Accept();
  64. //group_.Spawn([&]{
  65. // read
  66.  
  67. std::array<uint32_t, 2> sizes;
  68. conn->ReadFull(View(sizes));
  69. auto header_size = sizes[0];
  70. auto body_size = sizes[1];
  71.  
  72. std::string header_bytes(header_size, '\0');
  73. conn->ReadFull(View(header_bytes));
  74. RequestHeader header;
  75. if (!header.ParseFromString(header_bytes)) {
  76. throw std::runtime_error("Invalid header");
  77. }
  78.  
  79. auto service_iter = services_.find(header.service());
  80. if (service_iter == services_.end()) {
  81. throw std::runtime_error("Invalid service");
  82. }
  83. auto service = service_iter->second;
  84. auto method = service->ServiceDescriptor()->FindMethodByName(header.method());
  85. if (method == nullptr) {
  86. throw std::runtime_error("Invalid method");
  87. }
  88.  
  89. std::string body_bytes(body_size, '\0');
  90. conn->ReadFull(View(body_bytes));
  91. std::unique_ptr<google::protobuf::Message> body(
  92. google::protobuf::MessageFactory::generated_factory()
  93. ->GetPrototype(method->input_type())->New());
  94. if (!body->ParseFromString(body_bytes)) {
  95. throw std::runtime_error("Invalid body");
  96. }
  97.  
  98. // call
  99.  
  100. ResponseHeader rsp_header;
  101. std::unique_ptr<google::protobuf::Message> rsp_body(
  102. google::protobuf::MessageFactory::generated_factory()
  103. ->GetPrototype(method->output_type())->New());
  104. try {
  105. service->CallMethod(method, *body, rsp_body.get());
  106. } catch (const std::runtime_error& err) {
  107. rsp_header.mutable_rpc_error()->set_message(err.what());
  108. }
  109.  
  110. // write
  111.  
  112. std::string rsp_header_bytes;
  113. rsp_header.SerializeToString(&rsp_header_bytes);
  114. std::string rsp_body_bytes;
  115. rsp_body->SerializeToString(&rsp_body_bytes);
  116. std::array<uint32_t, 2> rsp_sizes{
  117. static_cast<uint32_t>(rsp_header_bytes.size()),
  118. static_cast<uint32_t>(rsp_body_bytes.size())};
  119. conn->Write(View(rsp_sizes));
  120. conn->Write(View(rsp_header_bytes));
  121. conn->Write(View(rsp_body_bytes));
  122.  
  123. conn->Close();
  124. //});
  125. }
  126. }
  127.  
  128. } // namespace cactus
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement