Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def getStormChildren(zkClient: ZkClient): Future[Seq[ZNode.Data]] = {
- getZChildren(zkClient, "/*/*").map(n => n.filter(z => {
- val t = z.path.split("/").filter(_ != "")(0)
- (t != "admin") && (t != "brokers") && (t != "consumers") && (t != "config") && (t != "controllers") && (t != "controller_epoch")
- })).flatMap[Seq[ZNode.Data]]({ children =>
- Future.sequence[ZNode.Data, Seq](children.map({ child =>
- twitterToScalaFuture[ZNode.Data](child.getData())
- }))
- })
- }
- def getStormSpoutPartitionOffsets(topicName: String, zkClient: ZkClient): Future[Map[String, Seq[Long]]] = {
- val partitionPromise: Promise[Map[String, Seq[Long]]] = Promise[Map[String, Seq[Long]]]
- ZNode(zkClient, s"/brokers/topics/$topicName").getData().onSuccess({ topicData =>
- JSON.perThreadNumberParser = { input: String => Integer.parseInt(input)}
- val topDataString = new String(topicData.bytes)
- var numPartitions = 0
- JSON.parseFull(topDataString) match {
- case Some(map: Map[String, Any]) =>
- numPartitions = map("partitions") match {
- case l: Map[Int, List[Any]] => l.keys.size
- case _ => 2
- }
- case t => Logger.debug(t.toString)
- }
- var parsedChildren = Map.empty[String, Seq[Long]]
- getStormChildren(zkClient).map[Unit]({ childDatas =>
- childDatas map { childData: ZNode.Data =>
- val data = new String(childData.bytes)
- JSON.parseFull(data) match {
- case Some(map: Map[String, Any]) =>
- // Grab all the values we need to identify the nodes
- val name = childData.path
- val topic = map("topic").toString
- val partition = map("partition").toString.toInt
- val offset = map("offset").toString.toLong
- val baseSeq: Seq[Long] = Seq.fill(numPartitions)(0)
- if (topic equals topicName) {
- if (parsedChildren.contains(name)) {
- val oldVal: Seq[Long] = parsedChildren(name)
- parsedChildren -= name // Remove oldNode from the map
- val newVal = oldVal.slice(0, partition) ++ Seq(offset) ++ oldVal.slice(partition + 1, oldVal.size)
- parsedChildren += (name -> newVal)
- }
- else {
- parsedChildren += (name ->
- (baseSeq.slice(0, partition) ++ Seq(offset) ++ baseSeq.slice(partition + 1, baseSeq.size)))
- }
- }
- case t => Logger.debug(t.toString)
- }
- }
- partitionPromise success parsedChildren
- })
- }).onFailure({ t =>
- partitionPromise failure t
- })
- partitionPromise.future
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement