Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.twitter.node_registry
- import scala.collection._
- import com.twitter.json.Json
- import com.twitter.zookeeper.ZooKeeperClient
- import net.lag.logging.Logger
- import org.apache.zookeeper.Watcher.Event.KeeperState
- import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent}
- trait Serializer {
- def serialize(map: immutable.Map[String, Int]): String
- def deserialize(data: String): Map[String, Int]
- }
- class JsonSerializer extends Serializer {
- override def serialize(map: immutable.Map[String, Int]): String = {
- Json.build(map).toString
- }
- override def deserialize(data: String): immutable.Map[String, Int] = {
- Json.parse(data).asInstanceOf[immutable.Map[String, Int]]
- }
- }
- trait Store {
- /**
- * Stores a node, data should be already serialized
- */
- def registerNode(host: String, port: Int, data: String)
- /**
- * Gets a map of nodes => serialized data
- */
- def getNodes: immutable.Map[String, String]
- /**
- * Removes a node
- */
- def removeNode(host: String, port: Int)
- }
- class ZookeeperStore(servers: Iterable[String], sessionTimeout: Int, connectionRetryIntervalMS: Int, basePath: String) extends Store {
- private val log = Logger.get
- var zk: ZooKeeperClient = null
- private var ephemerals: mutable.Map[String, String] = mutable.Map()
- private var connected = false
- connectToZookeeper()
- private def connectToZookeeper() {
- log.info("Attempting connection to Zookeeper servers %s with base path %s".format(servers, basePath))
- new ZooKeeperClient(servers.mkString(","), sessionTimeout, basePath, handleZookeeperConnect(_))
- }
- private def handleZookeeperConnect(zook: ZooKeeperClient): Unit = synchronized {
- zk = zook
- zk.createPath("hosts")
- ephemerals.foreach { case (node, data) => registerNode(node, data) }
- zk.watchChildrenWithData[String]("hosts", ephemerals, { data: Array[Byte] => new String(data) })
- }
- override def registerNode(host: String, port: Int, data: String) {
- registerNode("%s:%d".format(host, port), data)
- }
- def registerNode(nodePath: String, data: String) {
- var created = false
- val startTime = System.currentTimeMillis()
- val timeoutMS = sessionTimeout * 2
- while (!created && System.currentTimeMillis() < (startTime + timeoutMS)) {
- try {
- zk.create("hosts/%s".format(nodePath), data.getBytes, CreateMode.EPHEMERAL)
- created = true
- } catch {
- case _ : KeeperException.NodeExistsException => {
- log.warning("Ephemeral node " + nodePath + " already exists. Retrying...")
- Thread.sleep(1000)
- }
- }
- }
- if (!created) {
- throw new RuntimeException("Unable to create ephemeral node " + nodePath)
- }
- }
- override def getNodes: immutable.Map[String, String] = synchronized {
- immutable.Map[String, String]() ++ ephemerals
- }
- override def removeNode(host: String, port: Int) {
- zk.delete("hosts/%s:%d".format(host, port))
- }
- }
- /**
- * Server set takes a store and a serializer.
- * This is what most people will want to use all the time
- */
- class ServerSet(store: Store, serializer: Serializer) {
- /**
- * Join adds a server to the registry. "endpoints" is a Map that will be
- * serialized and stored as the payload
- */
- def join(host: String, port: Int, endpoints: immutable.Map[String, Int]) {
- store.registerNode(host, port, serializer.serialize(endpoints))
- }
- /**
- * Remove a server from the registry
- */
- def remove(host: String, port: Int) {
- store.removeNode(host, port)
- }
- /**
- * Get a list of all nodes. Returns a map of "host:port" => deserialized data
- */
- def list: Map[String, Map[String, Int]] = {
- store.getNodes.foldLeft(immutable.Map[String, Map[String, Int]]()) { (map, tuple) =>
- val (host, data) = tuple
- map + (host -> serializer.deserialize(data))
- }
- }
- }
Add Comment
Please, Sign In to add comment