Advertisement
Guest User

Untitled

a guest
Apr 19th, 2019
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.00 KB | None | 0 0
  1. #include "balancer.h"
  2.  
  3. void Transfer(cactus::IConn* from, cactus::IConn* to) {
  4. std::array<char, 256> buf;
  5. while (true) {
  6. try {
  7. auto size = from->Read(cactus::View(buf));
  8. if (size == 0) {
  9. to->CloseWrite(); // half open
  10. break;
  11. }
  12. to->Write(cactus::View(buf.data(), size));
  13. } catch (...) {
  14. break;
  15. }
  16. }
  17. }
  18.  
  19. class BalancerImpl : public IBalancer {
  20. public:
  21. BalancerImpl(const folly::SocketAddress& address) : lsn_(cactus::ListenTCP(address)) {
  22. }
  23.  
  24. ~BalancerImpl() override {
  25. }
  26.  
  27. void SetBackends(const std::vector<folly::SocketAddress>& peers) override {
  28. cactus::MutexGuard guard(mutex_);
  29. peers_.clear();
  30. peers_.reserve(peers.size());
  31. for (const auto& address : peers) {
  32. peers_.push_back(std::make_shared<PeerInfo>());
  33. peers_.back()->address = address;
  34. }
  35. }
  36.  
  37. void Run() override {
  38. group_.Spawn([this] {
  39. while (true) {
  40. std::shared_ptr<cactus::IConn> conn(lsn_->Accept());
  41. group_.Spawn([this, conn] {
  42. Handle(conn);
  43. });
  44. }
  45. });
  46. }
  47.  
  48. private:
  49. void Handle(std::shared_ptr<cactus::IConn> client_conn) {
  50. std::shared_ptr<cactus::IConn> peer_conn;
  51. {
  52. cactus::MutexGuard guard(mutex_);
  53. while (!peer_conn) {
  54. std::shared_ptr<PeerInfo> peer;
  55. for (const auto& candidate : peers_) {
  56. if (candidate->is_dead) {
  57. continue;
  58. }
  59. if (!peer || peer->num_connections > candidate->num_connections) {
  60. peer = candidate;
  61. }
  62. }
  63. if (!peer) {
  64. throw std::runtime_error("No alive peers");
  65. }
  66. try {
  67. ++peer->num_connections;
  68. peer_conn = cactus::DialTCP(peer->address);
  69. } catch (...) {
  70. peer->is_dead = true;
  71. }
  72. }
  73. }
  74.  
  75. cactus::WaitGroup wait_group;
  76. wait_group.Spawn([client_conn, peer_conn] {
  77. Transfer(client_conn.get(), peer_conn.get());
  78. });
  79. wait_group.Spawn([client_conn, peer_conn] {
  80. Transfer(peer_conn.get(), client_conn.get());
  81. });
  82. wait_group.Wait();
  83. }
  84.  
  85. private:
  86. struct PeerInfo {
  87. folly::SocketAddress address;
  88. std::atomic<int> num_connections = 0;
  89. std::atomic<bool> is_dead = false;
  90. };
  91.  
  92. const std::unique_ptr<cactus::IListener> lsn_;
  93.  
  94. cactus::Mutex mutex_;
  95. std::vector<std::shared_ptr<PeerInfo>> peers_;
  96. cactus::ServerGroup group_;
  97. };
  98.  
  99. std::unique_ptr<IBalancer> CreateBalancer(const folly::SocketAddress& address) {
  100. return std::make_unique<BalancerImpl>(address);
  101. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement