Advertisement
Guest User

Untitled

a guest
Jan 21st, 2017
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.48 KB | None | 0 0
  1. val builder = new TopologyBuilder()
  2. builder.setSpout("kafka-spout", new AlgoKafkaSpout().buildKafkaSpout(), 4)
  3. builder.setBolt("wlref-bolt", new WlrefBolt, 4).shuffleGrouping("kafka-spout")
  4. builder.setBolt("params-bolt", new ParamsBolt, 4).shuffleGrouping("wlref-bolt")
  5. builder.setBolt("gender-bolt", new GenderBolt, 4).shuffleGrouping("params-bolt")
  6. builder.setBolt("age-bolt", new AgeBolt, 4).shuffleGrouping("gender-bolt")
  7. builder.setBolt("preference-bolt", new PreferenceBolt, 4).shuffleGrouping("age-bolt")
  8. builder.setBolt("geo-bolt", new GeoBolt, 4).shuffleGrouping("preference-bolt")
  9. builder.setBolt("device-bolt", new DeviceBolt, 4).shuffleGrouping("geo-bolt")
  10. builder.setBolt("druid-bolt", new AlgoBeamBolt[java.util.Map[String, AnyRef]](new MyBeamFactory), 4)
  11. .shuffleGrouping("device-bolt")
  12. builder.setBolt("redis-bolt",new RedisBolt,4).shuffleGrouping("druid-bolt")
  13.  
  14. val conf = new Config()
  15. conf.setDebug(false)
  16. conf.setMessageTimeoutSecs(120)
  17. conf.setNumWorkers(1)
  18. StormSubmitter.submitTopology(args(0), conf, builder.createTopology)
  19.  
  20. val hosts = new ZkHosts(s"${Config.zkHost}:${Config.zkPort}")
  21. val topic = Config.kafkaTopic
  22. val zkRoot = s"/$topic"+"7"
  23. val groupId = Config.kafkaGroup
  24. val kafkaConfig = new SpoutConfig(hosts, topic, zkRoot, UUID.randomUUID().toString())
  25. kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
  26. kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime
  27. new KafkaSpout(kafkaConfig)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement