Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <rpc/reliable.hpp>
- #include <rpc/errors.hpp>
- // Futures
- #include <await/futures/core/future.hpp>
- // Fibers
- #include <await/fibers/core/api.hpp>
- #include <await/fibers/sync/future.hpp>
- // Logging
- #include <timber/log.hpp>
- #include <wheels/support/compiler.hpp>
- #include <chrono>
- using await::fibers::Await;
- using await::futures::Future;
- using wheels::Result;
- namespace rpc {
- //////////////////////////////////////////////////////////////////
- class ReliableChannel
- : public IChannel,
- public std::enable_shared_from_this<rpc::ReliableChannel> {
- struct Session {
- rpc::Method method;
- rpc::Message request;
- rpc::CallOptions options;
- rpc::Backoff backoff;
- await::futures::Promise<Message> p;
- };
- public:
- ReliableChannel(IChannelPtr fair_loss, Backoff::Params backoff_params,
- IRuntime* runtime)
- : fair_loss_(std::move(fair_loss)),
- backoff_params_(backoff_params),
- runtime_(runtime),
- logger_("Reliable", runtime->Log()) {
- }
- void Fetch(Session&& s) {
- auto self(shared_from_this());
- if (s.options.stop_advice.StopRequested()) {
- std::move(s.p).SetError(wheels::Error(rpc::Cancelled()));
- } else {
- fair_loss_->Call(s.method, s.request, s.options)
- .Subscribe([this, self, s = std::move(s)](auto result) mutable {
- if (result.HasError() &&
- rpc::IsRetriableError(result.GetErrorCode())) {
- Recall(std::move(s));
- } else {
- std::move(s).p.Set(result);
- }
- });
- }
- }
- void Recall(Session&& s) {
- auto self(shared_from_this());
- runtime_->Timers()
- ->After(s.backoff())
- .Subscribe([this, self, s = std::move(s)](auto) mutable {
- Fetch(std::move(s));
- });
- }
- Future<Message> Call(Method method, Message request,
- CallOptions options) override {
- WHEELS_UNUSED(backoff_params_);
- WHEELS_UNUSED(runtime_);
- LOG_INFO("Call({}, {}) started", method, request);
- auto [f, p] = await::futures::MakeContract<Message>();
- Session s{.method = std::move(method),
- .request = std::move(request),
- .options = std::move(options),
- .backoff = rpc::Backoff(backoff_params_),
- .p = std::move(p)};
- Fetch(std::move(s));
- return std::move(f);
- }
- private:
- IChannelPtr fair_loss_;
- const Backoff::Params backoff_params_;
- IRuntime* runtime_;
- timber::Logger logger_;
- };
- //////////////////////////////////////////////////////////////////////
- IChannelPtr MakeReliableChannel(IChannelPtr fair_loss,
- Backoff::Params backoff_params,
- IRuntime* runtime) {
- return std::make_shared<ReliableChannel>(std::move(fair_loss), backoff_params,
- runtime);
- }
- } // namespace rpc
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement