Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.evans.storm;
- import java.util.HashMap;
- import java.util.Map;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- import com.evans.storm.spout.RecordedTwitterSpout;
- public class TweetTopology {
- public static class ExclamationBolt implements IRichBolt {
- OutputCollector _collector;
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
- @Override
- public void cleanup() {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
- @SuppressWarnings("unchecked")
- public static void main(String[] args){
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(1, new RecordedTwitterSpout(), 1);
- builder.setBolt(2, new ExclamationBolt(), 3).shuffleGrouping(1);
- builder.setBolt(3, new ExclamationBolt(), 2).shuffleGrouping(2);
- Map config = new HashMap();
- config.put(Config.TOPOLOGY_DEBUG, true);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- //Utils.sleep(10 * 60 * 1000);
- //cluster.killTopology("test");
- //cluster.shutdown();
- }
- }
Add Comment
Please, Sign In to add comment