Guest User

Untitled

a guest
Jun 25th, 2018
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.76 KB | None | 0 0
  1. package com.twitter.node_registry
  2.  
  3. import scala.collection._
  4. import com.twitter.json.Json
  5. import com.twitter.zookeeper.ZooKeeperClient
  6. import net.lag.logging.Logger
  7. import org.apache.zookeeper.Watcher.Event.KeeperState
  8. import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent}
  9.  
  10. trait Serializer {
  11. def serialize(map: immutable.Map[String, Int]): String
  12.  
  13. def deserialize(data: String): Map[String, Int]
  14. }
  15.  
  16. class JsonSerializer extends Serializer {
  17. override def serialize(map: immutable.Map[String, Int]): String = {
  18. Json.build(map).toString
  19. }
  20.  
  21. override def deserialize(data: String): immutable.Map[String, Int] = {
  22. Json.parse(data).asInstanceOf[immutable.Map[String, Int]]
  23. }
  24. }
  25.  
  26. trait Store {
  27. /**
  28. * Stores a node, data should be already serialized
  29. */
  30. def registerNode(host: String, port: Int, data: String)
  31.  
  32. /**
  33. * Gets a map of nodes => serialized data
  34. */
  35. def getNodes: immutable.Map[String, String]
  36.  
  37. /**
  38. * Removes a node
  39. */
  40. def removeNode(host: String, port: Int)
  41. }
  42.  
  43. class ZookeeperStore(servers: Iterable[String], sessionTimeout: Int, connectionRetryIntervalMS: Int, basePath: String) extends Store {
  44. private val log = Logger.get
  45. var zk: ZooKeeperClient = null
  46. private var ephemerals: mutable.Map[String, String] = mutable.Map()
  47. private var connected = false
  48.  
  49. connectToZookeeper()
  50.  
  51. private def connectToZookeeper() {
  52. log.info("Attempting connection to Zookeeper servers %s with base path %s".format(servers, basePath))
  53. new ZooKeeperClient(servers.mkString(","), sessionTimeout, basePath, handleZookeeperConnect(_))
  54. }
  55.  
  56. private def handleZookeeperConnect(zook: ZooKeeperClient): Unit = synchronized {
  57. zk = zook
  58. zk.createPath("hosts")
  59. ephemerals.foreach { case (node, data) => registerNode(node, data) }
  60. zk.watchChildrenWithData[String]("hosts", ephemerals, { data: Array[Byte] => new String(data) })
  61. }
  62.  
  63. override def registerNode(host: String, port: Int, data: String) {
  64. registerNode("%s:%d".format(host, port), data)
  65. }
  66.  
  67. def registerNode(nodePath: String, data: String) {
  68. var created = false
  69. val startTime = System.currentTimeMillis()
  70. val timeoutMS = sessionTimeout * 2
  71. while (!created && System.currentTimeMillis() < (startTime + timeoutMS)) {
  72. try {
  73. zk.create("hosts/%s".format(nodePath), data.getBytes, CreateMode.EPHEMERAL)
  74. created = true
  75. } catch {
  76. case _ : KeeperException.NodeExistsException => {
  77. log.warning("Ephemeral node " + nodePath + " already exists. Retrying...")
  78. Thread.sleep(1000)
  79. }
  80. }
  81. }
  82. if (!created) {
  83. throw new RuntimeException("Unable to create ephemeral node " + nodePath)
  84. }
  85. }
  86.  
  87. override def getNodes: immutable.Map[String, String] = synchronized {
  88. immutable.Map[String, String]() ++ ephemerals
  89. }
  90.  
  91. override def removeNode(host: String, port: Int) {
  92. zk.delete("hosts/%s:%d".format(host, port))
  93. }
  94. }
  95.  
  96. /**
  97. * Server set takes a store and a serializer.
  98. * This is what most people will want to use all the time
  99. */
  100. class ServerSet(store: Store, serializer: Serializer) {
  101. /**
  102. * Join adds a server to the registry. "endpoints" is a Map that will be
  103. * serialized and stored as the payload
  104. */
  105. def join(host: String, port: Int, endpoints: immutable.Map[String, Int]) {
  106. store.registerNode(host, port, serializer.serialize(endpoints))
  107. }
  108.  
  109. /**
  110. * Remove a server from the registry
  111. */
  112. def remove(host: String, port: Int) {
  113. store.removeNode(host, port)
  114. }
  115.  
  116. /**
  117. * Get a list of all nodes. Returns a map of "host:port" => deserialized data
  118. */
  119. def list: Map[String, Map[String, Int]] = {
  120. store.getNodes.foldLeft(immutable.Map[String, Map[String, Int]]()) { (map, tuple) =>
  121. val (host, data) = tuple
  122. map + (host -> serializer.deserialize(data))
  123. }
  124. }
  125. }
Add Comment
Please, Sign In to add comment