Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val builder = new TopologyBuilder()
- builder.setSpout("kafka-spout", new AlgoKafkaSpout().buildKafkaSpout(), 4)
- builder.setBolt("wlref-bolt", new WlrefBolt, 4).shuffleGrouping("kafka-spout")
- builder.setBolt("params-bolt", new ParamsBolt, 4).shuffleGrouping("wlref-bolt")
- builder.setBolt("gender-bolt", new GenderBolt, 4).shuffleGrouping("params-bolt")
- builder.setBolt("age-bolt", new AgeBolt, 4).shuffleGrouping("gender-bolt")
- builder.setBolt("preference-bolt", new PreferenceBolt, 4).shuffleGrouping("age-bolt")
- builder.setBolt("geo-bolt", new GeoBolt, 4).shuffleGrouping("preference-bolt")
- builder.setBolt("device-bolt", new DeviceBolt, 4).shuffleGrouping("geo-bolt")
- builder.setBolt("druid-bolt", new AlgoBeamBolt[java.util.Map[String, AnyRef]](new MyBeamFactory), 4)
- .shuffleGrouping("device-bolt")
- builder.setBolt("redis-bolt",new RedisBolt,4).shuffleGrouping("druid-bolt")
- val conf = new Config()
- conf.setDebug(false)
- conf.setMessageTimeoutSecs(120)
- conf.setNumWorkers(1)
- StormSubmitter.submitTopology(args(0), conf, builder.createTopology)
- val hosts = new ZkHosts(s"${Config.zkHost}:${Config.zkPort}")
- val topic = Config.kafkaTopic
- val zkRoot = s"/$topic"+"7"
- val groupId = Config.kafkaGroup
- val kafkaConfig = new SpoutConfig(hosts, topic, zkRoot, UUID.randomUUID().toString())
- kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
- kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime
- new KafkaSpout(kafkaConfig)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement