Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <limits>
- #include "balancer.h"
- class Balancer : public IBalancer {
- public:
- Balancer(const folly::SocketAddress& address) : lsn_(cactus::ListenTCP(address)) {
- }
- void SetBackends(const std::vector<folly::SocketAddress>& peers) {
- for (auto peer : peers) {
- if (peers_.count(peer) == 0) {
- peers_[peer] = 0;
- }
- }
- }
- void Run() {
- for (;;) {
- std::shared_ptr<cactus::IConn> conn = lsn_->Accept();
- g_.Spawn([conn, this] {
- bool found = false;
- while (!found && peers_.size()) {
- auto min_peer = FindPeer();
- std::unique_ptr<cactus::IConn> peer_conn;
- cactus::TimeoutGuard timeout_guard(std::chrono::seconds(2));
- try {
- peer_conn = cactus::DialTCP(min_peer);
- timeout_guard.Deactivate();
- } catch (const cactus::TimeoutException& timeout) {
- peers_.erase(min_peer);
- continue;
- }
- found = true;
- ++peers_[min_peer];
- }
- })
- }
- }
- folly::SocketAddress FindPeer() {
- folly::SocketAddress min_peer = peers_.begin()->first;
- size_t min_connections = peers_.begin()->second;
- for (auto [peer, connections] : peers_) {
- if (connections < min_connections) {
- min_peer = peer;
- min_connections = connections;
- }
- }
- return min_peer;
- }
- private:
- std::shared_ptr<cactus::IListener> lsn_;
- std::unordered_map<folly::SocketAddress, size_t> peers_;
- cactus::ServerGroup g_;
- };
- std::unique_ptr<IBalancer> CreateBalancer(const folly::SocketAddress& address) {
- return std::make_unique<Balancer>(address);
- }
Advertisement
Add Comment
Please, Sign In to add comment