Advertisement
ostapdontstop

ralaible channel

Sep 24th, 2021
1,003
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.95 KB | None | 0 0
  1. #include <rpc/reliable.hpp>
  2. #include <rpc/errors.hpp>
  3.  
  4. // Futures
  5. #include <await/futures/core/future.hpp>
  6.  
  7. // Fibers
  8. #include <await/fibers/core/api.hpp>
  9. #include <await/fibers/sync/future.hpp>
  10.  
  11. // Logging
  12. #include <timber/log.hpp>
  13.  
  14. #include <wheels/support/compiler.hpp>
  15.  
  16. #include <chrono>
  17.  
  18. using await::fibers::Await;
  19. using await::futures::Future;
  20. using wheels::Result;
  21.  
  22. namespace rpc {
  23.  
  24. //////////////////////////////////////////////////////////////////
  25.  
  26. class ReliableChannel
  27.     : public IChannel,
  28.       public std::enable_shared_from_this<rpc::ReliableChannel> {
  29.   struct Session {
  30.     rpc::Method method;
  31.     rpc::Message request;
  32.     rpc::CallOptions options;
  33.     rpc::Backoff backoff;
  34.     await::futures::Promise<Message> p;
  35.   };
  36.  
  37.  public:
  38.   ReliableChannel(IChannelPtr fair_loss, Backoff::Params backoff_params,
  39.                   IRuntime* runtime)
  40.       : fair_loss_(std::move(fair_loss)),
  41.         backoff_params_(backoff_params),
  42.         runtime_(runtime),
  43.         logger_("Reliable", runtime->Log()) {
  44.   }
  45.  
  46.   void Fetch(Session&& s) {
  47.     auto self(shared_from_this());
  48.     if (s.options.stop_advice.StopRequested()) {
  49.       std::move(s.p).SetError(wheels::Error(rpc::Cancelled()));
  50.     } else {
  51.       fair_loss_->Call(s.method, s.request, s.options)
  52.           .Subscribe([this, self, s = std::move(s)](auto result) mutable {
  53.             if (result.HasError() &&
  54.                 rpc::IsRetriableError(result.GetErrorCode())) {
  55.               Recall(std::move(s));
  56.             } else {
  57.               std::move(s).p.Set(result);
  58.             }
  59.           });
  60.     }
  61.   }
  62.  
  63.   void Recall(Session&& s) {
  64.     auto self(shared_from_this());
  65.     runtime_->Timers()
  66.         ->After(s.backoff())
  67.         .Subscribe([this, self, s = std::move(s)](auto) mutable {
  68.           Fetch(std::move(s));
  69.         });
  70.   }
  71.  
  72.   Future<Message> Call(Method method, Message request,
  73.                        CallOptions options) override {
  74.     WHEELS_UNUSED(backoff_params_);
  75.     WHEELS_UNUSED(runtime_);
  76.  
  77.     LOG_INFO("Call({}, {}) started", method, request);
  78.  
  79.     auto [f, p] = await::futures::MakeContract<Message>();
  80.  
  81.     Session s{.method = std::move(method),
  82.               .request = std::move(request),
  83.               .options = std::move(options),
  84.               .backoff = rpc::Backoff(backoff_params_),
  85.               .p = std::move(p)};
  86.  
  87.     Fetch(std::move(s));
  88.  
  89.     return std::move(f);
  90.   }
  91.  
  92.  private:
  93.   IChannelPtr fair_loss_;
  94.   const Backoff::Params backoff_params_;
  95.   IRuntime* runtime_;
  96.   timber::Logger logger_;
  97. };
  98.  
  99. //////////////////////////////////////////////////////////////////////
  100.  
  101. IChannelPtr MakeReliableChannel(IChannelPtr fair_loss,
  102.                                 Backoff::Params backoff_params,
  103.                                 IRuntime* runtime) {
  104.   return std::make_shared<ReliableChannel>(std::move(fair_loss), backoff_params,
  105.                                            runtime);
  106. }
  107.  
  108. }  // namespace rpc
  109.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement