Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "balancer.h"
- void Transfer(cactus::IConn* from, cactus::IConn* to) {
- std::array<char, 256> buf;
- while (true) {
- try {
- auto size = from->Read(cactus::View(buf));
- if (size == 0) {
- to->CloseWrite(); // half open
- break;
- }
- to->Write(cactus::View(buf.data(), size));
- } catch (...) {
- break;
- }
- }
- }
- class BalancerImpl : public IBalancer {
- public:
- BalancerImpl(const folly::SocketAddress& address) : lsn_(cactus::ListenTCP(address)) {
- }
- ~BalancerImpl() override {
- }
- void SetBackends(const std::vector<folly::SocketAddress>& peers) override {
- cactus::MutexGuard guard(mutex_);
- peers_.clear();
- peers_.reserve(peers.size());
- for (const auto& address : peers) {
- peers_.push_back(std::make_shared<PeerInfo>());
- peers_.back()->address = address;
- }
- }
- void Run() override {
- group_.Spawn([this] {
- while (true) {
- std::shared_ptr<cactus::IConn> conn(lsn_->Accept());
- group_.Spawn([this, conn] {
- Handle(conn);
- });
- }
- });
- }
- private:
- void Handle(std::shared_ptr<cactus::IConn> client_conn) {
- std::shared_ptr<cactus::IConn> peer_conn;
- {
- cactus::MutexGuard guard(mutex_);
- while (!peer_conn) {
- std::shared_ptr<PeerInfo> peer;
- for (const auto& candidate : peers_) {
- if (candidate->is_dead) {
- continue;
- }
- if (!peer || peer->num_connections > candidate->num_connections) {
- peer = candidate;
- }
- }
- if (!peer) {
- throw std::runtime_error("No alive peers");
- }
- try {
- ++peer->num_connections;
- peer_conn = cactus::DialTCP(peer->address);
- } catch (...) {
- peer->is_dead = true;
- }
- }
- }
- cactus::WaitGroup wait_group;
- wait_group.Spawn([client_conn, peer_conn] {
- Transfer(client_conn.get(), peer_conn.get());
- });
- wait_group.Spawn([client_conn, peer_conn] {
- Transfer(peer_conn.get(), client_conn.get());
- });
- wait_group.Wait();
- }
- private:
- struct PeerInfo {
- folly::SocketAddress address;
- std::atomic<int> num_connections = 0;
- std::atomic<bool> is_dead = false;
- };
- const std::unique_ptr<cactus::IListener> lsn_;
- cactus::Mutex mutex_;
- std::vector<std::shared_ptr<PeerInfo>> peers_;
- cactus::ServerGroup group_;
- };
- std::unique_ptr<IBalancer> CreateBalancer(const folly::SocketAddress& address) {
- return std::make_unique<BalancerImpl>(address);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement