Guest User

Untitled

a guest
Jan 17th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.84 KB | None | 0 0
  1. package com.evans.storm;
  2.  
  3. import java.util.HashMap;
  4. import java.util.Map;
  5.  
  6. import backtype.storm.Config;
  7. import backtype.storm.LocalCluster;
  8. import backtype.storm.task.OutputCollector;
  9. import backtype.storm.task.TopologyContext;
  10. import backtype.storm.topology.IRichBolt;
  11. import backtype.storm.topology.OutputFieldsDeclarer;
  12. import backtype.storm.topology.TopologyBuilder;
  13. import backtype.storm.tuple.Fields;
  14. import backtype.storm.tuple.Tuple;
  15. import backtype.storm.tuple.Values;
  16.  
  17. import com.evans.storm.spout.RecordedTwitterSpout;
  18.  
  19. public class TweetTopology {
  20.  
  21. public static class ExclamationBolt implements IRichBolt {
  22. OutputCollector _collector;
  23.  
  24. @Override
  25. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  26. _collector = collector;
  27. }
  28.  
  29. @Override
  30. public void execute(Tuple tuple) {
  31. _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
  32. _collector.ack(tuple);
  33. }
  34.  
  35. @Override
  36. public void cleanup() {
  37. }
  38.  
  39. @Override
  40. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  41. declarer.declare(new Fields("word"));
  42. }
  43.  
  44.  
  45. }
  46. @SuppressWarnings("unchecked")
  47. public static void main(String[] args){
  48. TopologyBuilder builder = new TopologyBuilder();
  49.  
  50. builder.setSpout(1, new RecordedTwitterSpout(), 1);
  51. builder.setBolt(2, new ExclamationBolt(), 3).shuffleGrouping(1);
  52. builder.setBolt(3, new ExclamationBolt(), 2).shuffleGrouping(2);
  53.  
  54. Map config = new HashMap();
  55. config.put(Config.TOPOLOGY_DEBUG, true);
  56.  
  57. LocalCluster cluster = new LocalCluster();
  58. cluster.submitTopology("test", config, builder.createTopology());
  59.  
  60. //Utils.sleep(10 * 60 * 1000);
  61. //cluster.killTopology("test");
  62. //cluster.shutdown();
  63. }
  64. }
Add Comment
Please, Sign In to add comment