Guest User

Untitled

a guest
Jan 21st, 2018
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.42 KB | None | 0 0
  1. """A distributed group membership module
  2.  
  3. This provides distributed group membership for easily building clustered
  4. applications with gevent. Using this in your app, you just provide the IP
  5. of another node in the cluster and it will receive the IPs of all nodes in
  6. the cluster. When a node joins or drops from the cluster, all other nodes find
  7. out immediately.
  8.  
  9. The roster is managed by a leader. When you create a cluster, you tell the
  10. first node it is the leader (by simply pointing it to its own IP). As you
  11. add nodes, you can point them to the leader or any other node. If a node
  12. is not the leader, it will redirect the connection to the leader. All nodes
  13. also maintain a keepalive with the leader.
  14.  
  15. If the leader drops from the cluster, the nodes will dumbly pick a new leader
  16. by taking the remaining node list, sorting it, and picking the first node. If
  17. a node happens to get a different leader, as long as it is in the cluster, it
  18. will be redirected to the right leader.
  19.  
  20. """
  21. import gevent.monkey; gevent.monkey.patch_all(thread=False)
  22.  
  23. import logging
  24. import socket
  25. import json
  26.  
  27. import gevent
  28. import gevent.server
  29. import gevent.socket
  30.  
  31. from gevent_tools import util
  32. from gevent_tools import service
  33.  
  34. CLIENT_TIMEOUT_SECONDS = 10
  35. SERVER_KEEPALIVE_SECONDS = 5
  36.  
  37. def logger(obj):
  38. name = '%s.%s' % (obj.__module__, obj.__class__.__name__)
  39. return logging.getLogger(name)
  40.  
  41. class ClusterError(Exception): pass
  42. class NewLeader(Exception): pass
  43.  
  44. class ClusterManager(service.Service):
  45. def __init__(self, callback, listen_address, leader_address=None, client_hostname=None):
  46. self.server = PeerServer(self, listen_address)
  47. self.client = PeerClient(self, leader_address, client_hostname)
  48. self.cluster = set()
  49. self._callback = callback
  50.  
  51. self.add_service(self.server)
  52. if leader_address:
  53. self.add_service(self.client)
  54. self.is_leader = False
  55. else:
  56. self.is_leader = True
  57.  
  58. def trigger_callback(self):
  59. if self._callback:
  60. self._callback(self.cluster.copy())
  61.  
  62. class PeerServer(service.Service):
  63. def __init__(self, manager, address):
  64. self.logger = logger(self)
  65. self.manager = manager
  66. self.address = address
  67. self.clients = {}
  68. self.server = gevent.server.StreamServer(address,
  69. handle=self.handle, spawn=self.spawn)
  70.  
  71. self.add_service(self.server)
  72.  
  73. def do_start(self):
  74. if self.manager.is_leader:
  75. self.manager.cluster.add(self.address[0])
  76. self.manager.trigger_callback()
  77. gevent.sleep(0)
  78.  
  79. def handle(self, socket, address):
  80. """
  81. If not a leader, a node will simply return a single item list pointing
  82. to the leader. Otherwise, it will add the host of the connected client
  83. to the cluster roster, broadcast to all nodes the new roster, and wait
  84. for keepalives. If no keepalive within timeout or the client drops, it
  85. drops it from the roster and broadcasts to all remaining nodes.
  86. """
  87. self.logger.debug('New connection from %s:%s' % address)
  88. if not self.manager.is_leader:
  89. socket.send(json.dumps({'leader': self.manager.client.leader_address[0],
  90. 'port': self.manager.client.leader_address[1]}))
  91. socket.close()
  92. self.logger.debug("Redirected to %s:%s" % self.manager.client.leader_address)
  93. else:
  94. socket.send(self._cluster_message())
  95. sockfile = socket.makefile()
  96. name = sockfile.readline()
  97. if not name:
  98. return
  99. if name == '\n':
  100. name = address[0]
  101. else:
  102. name = name.strip()
  103. self._update(add={'host': name, 'socket': socket})
  104. # TODO: Use TCP keepalives
  105. timeout = self._client_timeout(socket)
  106. for line in util.line_protocol(sockfile, strip=False):
  107. timeout.kill()
  108. timeout = self._client_timeout(socket)
  109. socket.send('\n')
  110. self.logger.debug("Keepalive from %s:%s" % address)
  111. self.logger.debug("Client disconnected from %s:%s" % address)
  112. self._update(remove=name)
  113.  
  114. def _client_timeout(self, socket):
  115. def shutdown(socket):
  116. try:
  117. socket.shutdown(0)
  118. except IOError:
  119. pass
  120. return self.spawn_later(CLIENT_TIMEOUT_SECONDS,
  121. lambda: shutdown(socket))
  122.  
  123. def _cluster_message(self):
  124. return '%s\n' % json.dumps({'cluster': list(self.manager.cluster)})
  125.  
  126. def _update(self, add=None, remove=None):
  127. """ Used by leader to manage and broadcast roster """
  128. if add is not None:
  129. self.manager.cluster.add(add['host'])
  130. self.clients[add['host']] = add['socket']
  131. self.logger.debug("Added to cluster: %s" % add['host'])
  132. if remove is not None:
  133. self.manager.cluster.remove(remove)
  134. del self.clients[remove]
  135. self.logger.debug("Removed from cluster: %s" % remove)
  136. for client in self.clients:
  137. self.clients[client].send(self._cluster_message())
  138. self.manager.trigger_callback()
  139.  
  140. class PeerClient(service.Service):
  141. def __init__(self, manager, leader_address, client_hostname=None):
  142. self.logger = logger(self)
  143. self.manager = manager
  144. self.leader_address = leader_address
  145. self.client_hostname = client_hostname
  146.  
  147. # For connection retries. None means default
  148. self._max_retries = 5
  149. self._delay = None
  150. self._max_delay = None
  151.  
  152. def do_start(self):
  153. self.spawn(self.connect)
  154. return service.NOT_READY
  155.  
  156. def connect(self):
  157. while True:
  158. self.logger.debug("Connecting to leader at %s:%s" %
  159. self.leader_address)
  160. try:
  161. socket = util.connect_and_retry(self.leader_address,
  162. max_retries=self._max_retries, delay=self._delay,
  163. max_delay=self._max_delay)
  164. except IOError:
  165. raise ClusterError("Unable to connect to leader %s:%s" %
  166. self.leader_address)
  167. self.handle(socket)
  168.  
  169. def handle(self, socket):
  170. self.set_ready()
  171. self.logger.debug("Connected to leader")
  172. client_address = self.client_hostname or socket.getsockname()[0]
  173. socket.send('%s\n' % client_address)
  174. # TODO: Use TCP keepalives
  175. keepalive = self._server_keepalive(socket)
  176. try:
  177. for line in util.line_protocol(socket, strip=False):
  178. if line == '\n':
  179. # Keepalive ack from leader
  180. keepalive.kill()
  181. keepalive = self._server_keepalive(socket)
  182. else:
  183. cluster = json.loads(line)
  184. if 'leader' in cluster:
  185. # Means you have the wrong leader, redirect
  186. host = cluster['leader']
  187. port = cluster.get('port', self.leader_address[1])
  188. self.leader_address = (host, port)
  189. self.logger.info("Redirected to %s:%s..." %
  190. self.leader_address)
  191. raise NewLeader()
  192. elif client_address in cluster['cluster']:
  193. # Only report cluster once I'm a member
  194. self.manager.cluster = set(cluster['cluster'])
  195. self.manager.trigger_callback()
  196. self._leader_election()
  197. except NewLeader:
  198. self.manager.trigger_callback()
  199. if self.leader_address[0] == client_address:
  200. self.manager.is_leader = True
  201. self.stop()
  202. else:
  203. return
  204.  
  205. def _server_keepalive(self, socket):
  206. return self.spawn_later(SERVER_KEEPALIVE_SECONDS,
  207. lambda: socket.send('\n'))
  208.  
  209. def _leader_election(self):
  210. candidates = list(self.manager.cluster)
  211. candidates.remove(self.leader_address[0])
  212. candidates.sort()
  213. self.manager.leader = candidates[0]
  214. self.logger.info("New leader %s:%s..." % self.manager.leader_address)
  215. # TODO: if i end up thinking i'm the leader when i'm not
  216. # then i will not rejoin the cluster
  217. raise NewLeader()
Add Comment
Please, Sign In to add comment