Guest User

Hanging cyclic Storm topology

a guest
Jan 18th, 2013
297
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.00 KB | None | 0 0
  1. package storm.experiment;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.logging.Logger;
  7.  
  8. import backtype.storm.Config;
  9. import backtype.storm.LocalCluster;
  10. import backtype.storm.generated.StormTopology;
  11. import backtype.storm.spout.SpoutOutputCollector;
  12. import backtype.storm.task.OutputCollector;
  13. import backtype.storm.task.TopologyContext;
  14. import backtype.storm.topology.OutputFieldsDeclarer;
  15. import backtype.storm.topology.TopologyBuilder;
  16. import backtype.storm.topology.base.BaseRichBolt;
  17. import backtype.storm.topology.base.BaseRichSpout;
  18. import backtype.storm.tuple.Fields;
  19. import backtype.storm.tuple.Tuple;
  20. import backtype.storm.tuple.Values;
  21.  
  22. public class CyclicTopology {
  23.  
  24.     public static final String fountainStream = "fountainStream";
  25.     public static final String gullyStream = "gullyStream";
  26.  
  27.     private static class WaterSpout extends BaseRichSpout {
  28.  
  29.  
  30.         private static final long serialVersionUID = -992077936760898653L;
  31.         private SpoutOutputCollector _collector;
  32.  
  33.         long drop = 0;
  34.  
  35.         @SuppressWarnings("rawtypes")
  36.         @Override
  37.         public void open(Map conf, TopologyContext context,
  38.                 SpoutOutputCollector collector) {
  39.             _collector = collector;
  40.  
  41.         }
  42.  
  43.         @Override
  44.         public void nextTuple() {
  45.             _collector.emit(new Values("drop "+ drop++));
  46.         }
  47.  
  48.         @Override
  49.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  50.             declarer.declare(new Fields("drop"));
  51.         }
  52.  
  53.     }
  54.  
  55.     private static class FountainBolt extends BaseRichBolt {
  56.  
  57.         private static final long serialVersionUID = -5232340252708532106L;
  58.         private OutputCollector _collector;
  59.  
  60.  
  61.  
  62.         @SuppressWarnings("rawtypes")
  63.         @Override
  64.         public void prepare(Map stormConf, TopologyContext context,
  65.                 OutputCollector collector) {
  66.             _collector = collector;
  67.  
  68.         }
  69.  
  70.         @Override
  71.         public void execute(Tuple input) {
  72.             if (input.size() == 1) {
  73.                 String drop = input.getString(0);
  74.                 List<Object> output = new ArrayList<Object>(2);
  75.                 output.add(drop);
  76.                 output.add(1);
  77.                 _collector.emit(fountainStream, input, output);
  78.             } else if (input.size() == 2) {
  79.                 String drop = input.getString(0);
  80.                 Integer count = input.getInteger(1);
  81.  
  82.                 List<Object> output = new ArrayList<Object>(2);
  83.  
  84.                 if (count < 3) {
  85.                     output.add(drop);
  86.                     output.add(count + 1);
  87.                     _collector.emit(fountainStream, input, output);
  88.                 } else {
  89.                     output.add(drop);
  90.                     _collector.emit(gullyStream, input, output);
  91.                 }
  92.  
  93.             }
  94.             _collector.ack(input);
  95.         }
  96.  
  97.         @Override
  98.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  99.             declarer.declareStream(fountainStream, new Fields("drop", "use"));
  100.             declarer.declareStream(gullyStream, new Fields("drop"));
  101.         }
  102.  
  103.  
  104.     }
  105.  
  106.     private static class GullyBolt extends BaseRichBolt {
  107.  
  108.         private static final long serialVersionUID = 2758474522361204980L;
  109.         private OutputCollector _collector;
  110.  
  111.         private int count = 0;
  112.         private Logger log;
  113.  
  114.         @SuppressWarnings("rawtypes")
  115.         @Override
  116.         public void prepare(Map stormConf, TopologyContext context,
  117.                 OutputCollector collector) {
  118.             this._collector = collector;
  119.             this.log = Logger.getLogger("Gully");
  120.         }
  121.  
  122.         @Override
  123.         public void execute(Tuple input) {
  124.             if (count++ % 1000 == 0) log.info(input.getString(0));
  125.             _collector.ack(input);
  126.         }
  127.  
  128.         @Override
  129.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  130.             // nothing to declare, sir!
  131.         }
  132.  
  133.     }
  134.  
  135.     public static void main(String[] args) throws Exception {
  136.         TopologyBuilder builder = new TopologyBuilder();
  137.  
  138.         builder.setSpout("rain", new WaterSpout(), 1);
  139.         builder.setBolt("fountain", new FountainBolt(), 1)
  140.             .shuffleGrouping("rain")
  141.             .shuffleGrouping("fountain", fountainStream);
  142.         builder.setBolt("gully", new GullyBolt(), 1)
  143.             .shuffleGrouping("fountain", gullyStream);
  144.  
  145.         StormTopology topology = builder.createTopology();
  146.  
  147.         Config conf = new Config();
  148.  
  149.         System.out.println("Using local cluster");
  150.  
  151.         new LocalCluster().submitTopology("rainyday", conf, topology);
  152.  
  153.     }
  154. }
Advertisement
Add Comment
Please, Sign In to add comment