Advertisement
Guest User

New Storm Spout Code

a guest
Oct 23rd, 2014
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.74 KB | None | 0 0
  1.   def getStormChildren(zkClient: ZkClient): Future[Seq[ZNode.Data]] = {
  2.     getZChildren(zkClient, "/*/*").map(n => n.filter(z => {
  3.       val t = z.path.split("/").filter(_ != "")(0)
  4.         (t != "admin") && (t != "brokers") && (t != "consumers") && (t != "config") && (t != "controllers") && (t != "controller_epoch")
  5.     })).flatMap[Seq[ZNode.Data]]({ children =>
  6.       Future.sequence[ZNode.Data, Seq](children.map({ child =>
  7.         twitterToScalaFuture[ZNode.Data](child.getData())
  8.       }))
  9.     })
  10.   }
  11.  
  12.   def getStormSpoutPartitionOffsets(topicName: String, zkClient: ZkClient): Future[Map[String, Seq[Long]]] = {
  13.     val partitionPromise: Promise[Map[String, Seq[Long]]] = Promise[Map[String, Seq[Long]]]
  14.  
  15.     ZNode(zkClient, s"/brokers/topics/$topicName").getData().onSuccess({ topicData =>
  16.       JSON.perThreadNumberParser = { input: String => Integer.parseInt(input)}
  17.       val topDataString = new String(topicData.bytes)
  18.       var numPartitions = 0
  19.       JSON.parseFull(topDataString) match {
  20.         case Some(map: Map[String, Any]) =>
  21.           numPartitions = map("partitions") match {
  22.             case l: Map[Int, List[Any]] => l.keys.size
  23.             case _ => 2
  24.           }
  25.         case t => Logger.debug(t.toString)
  26.       }
  27.  
  28.       var parsedChildren = Map.empty[String, Seq[Long]]
  29.       getStormChildren(zkClient).map[Unit]({ childDatas =>
  30.         childDatas map { childData: ZNode.Data =>
  31.           val data = new String(childData.bytes)
  32.           JSON.parseFull(data) match {
  33.             case Some(map: Map[String, Any]) =>
  34.               // Grab all the values we need to identify the nodes
  35.               val name = childData.path
  36.               val topic = map("topic").toString
  37.               val partition = map("partition").toString.toInt
  38.               val offset = map("offset").toString.toLong
  39.  
  40.               val baseSeq: Seq[Long] = Seq.fill(numPartitions)(0)
  41.  
  42.               if (topic equals topicName) {
  43.                 if (parsedChildren.contains(name)) {
  44.                   val oldVal: Seq[Long] = parsedChildren(name)
  45.                   parsedChildren -= name // Remove oldNode from the map
  46.                   val newVal = oldVal.slice(0, partition) ++ Seq(offset) ++ oldVal.slice(partition + 1, oldVal.size)
  47.                   parsedChildren += (name -> newVal)
  48.                 }
  49.                 else {
  50.                   parsedChildren += (name ->
  51.                     (baseSeq.slice(0, partition) ++ Seq(offset) ++ baseSeq.slice(partition + 1, baseSeq.size)))
  52.                 }
  53.               }
  54.  
  55.             case t => Logger.debug(t.toString)
  56.           }
  57.         }
  58.         partitionPromise success parsedChildren
  59.       })
  60.     }).onFailure({ t =>
  61.       partitionPromise failure t
  62.     })
  63.  
  64.     partitionPromise.future
  65.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement