Advertisement
Ladies_Man

#HADOOP Lab7 (Storm) COMPLETE

Dec 19th, 2015
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.69 KB | None | 0 0
  1. //hadlab7: Storm
  2. Лабораторная работа 7. Разработка простой топологии storm.
  3. Задача:
  4. -Требуется разработать топологию для составления частотного словаря файлов.
  5. -Требуется разработать Spout который будет опрашивать директорию и в случае
  6.     обнаружения в ней новых файлов читать ее построчно и генерировать tuple в
  7.     исходящий поток words для каждой строки. После завершения чтения файла
  8.     Spout должен выдать tuple в исходящий поток sync. Также после окончания
  9.     файла требуется перенести файл в папку для обработанных файлов
  10. -Требуется разработать Bolt Splitter который будет принимать строку и разбивать ее на слова.
  11. -Требуется разработать Bolt Counter который будет принимать слова из
  12.     входящего потока Splitter и вести частотный словарь. Также по команде от
  13.     входного потока sync — печатать текущий словарь на экране и обнулять его.
  14.  
  15.  
  16.  
  17. //запуск:
  18. //mvn package
  19. //mvn exec:java -Dexec.mainClass="StormLab"
  20.  
  21.  
  22.  
  23. //StormLab.java
  24. import backtype.storm.Config;
  25. import backtype.storm.LocalCluster;
  26. import backtype.storm.topology.TopologyBuilder;
  27. import backtype.storm.tuple.Fields;
  28.  
  29. public class StormLab {
  30.  
  31.     public static void main(String[] args) throws Exception {
  32.  
  33.         TopologyBuilder builder = new TopologyBuilder();
  34.         builder.setSpout("generator", new PollSpout());
  35.  
  36.         builder.setBolt("splitter", new BoltSplitter(), 10)
  37.                 .shuffleGrouping("generator", "words");
  38.  
  39.         builder.setBolt("counter", new BoltCounter(), 1)
  40.                 .fieldsGrouping("splitter", new Fields("word"))
  41.                 .allGrouping("generator", "sync");
  42.  
  43.         Config config = new Config();
  44.         config.setDebug(false);
  45.  
  46.         LocalCluster cluster = new LocalCluster();
  47.         cluster.submitTopology("Frequency Dictionary", config, builder.createTopology());
  48.     }
  49. }
  50.  
  51.  
  52.  
  53.  
  54.  
  55.  
  56. //PollSpout.java
  57. import backtype.storm.spout.SpoutOutputCollector;
  58. import backtype.storm.task.TopologyContext;
  59. import backtype.storm.topology.OutputFieldsDeclarer;
  60. import backtype.storm.topology.base.BaseRichSpout;
  61. import backtype.storm.tuple.Fields;
  62. import backtype.storm.tuple.Values;
  63. import com.google.common.collect.Lists;
  64. import com.google.common.io.Files;
  65. import org.apache.storm.shade.com.google.common.base.Charsets;
  66.  
  67. import java.io.*;
  68. import java.util.Map;
  69.  
  70. public class PollSpout extends BaseRichSpout {
  71.     private SpoutOutputCollector output;
  72.     private boolean reading_currently;
  73.     private File dir, current_file;
  74.     private BufferedReader reader;
  75.     private int ack_num, emit_num;
  76.  
  77.     public PollSpout (){
  78.         emit_num = 0;
  79.         ack_num = 0;
  80.         reading_currently = false;
  81.         dir = new File("/home/anthony/hadlab7/in_files");
  82.  
  83.         //should exist 2 directories:
  84.             //in_files with input file in it
  85.             //out_files will contain input file after its processed
  86.  
  87.     }
  88.  
  89.     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  90.         output = spoutOutputCollector;
  91.     }
  92.  
  93.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  94.         outputFieldsDeclarer.declareStream("words", new Fields("words"));
  95.         outputFieldsDeclarer.declareStream("sync", new Fields());
  96.     }
  97.  
  98.     public void nextTuple() {
  99.  
  100.         if (reading_currently) {
  101.             try {
  102.  
  103.                 String line = reader.readLine();
  104.                 if (null != line) {
  105.  
  106.                     output.emit("words", new Values(line), emit_num);
  107.                     emit_num++;
  108.  
  109.                 } else {
  110.                     if(ack_num == emit_num) {
  111.  
  112.                         File dest_file = new File("/home/anthony/hadlab7/out_files/" + current_file.getName());
  113.                         Files.move(current_file, dest_file);
  114.  
  115.                         output.emit("sync", Lists.newArrayList());
  116.                         ack_num = 0;
  117.                         emit_num = 0;
  118.                         reading_currently = false;
  119.  
  120.                     }
  121.                 }
  122.             } catch (IOException e) { e.printStackTrace(); }
  123.         } else {
  124.  
  125.             File files_list[] = dir.listFiles();
  126.             if (null == files_list || 0 == files_list.length) {
  127.  
  128.                 backtype.storm.utils.Utils.sleep(100);
  129.  
  130.             } else {
  131.                 try {
  132.  
  133.                     current_file = files_list[0];
  134.                     reader = new BufferedReader(new InputStreamReader(
  135.                             new FileInputStream(current_file), Charsets.UTF_8));
  136.  
  137.                 } catch (FileNotFoundException e) { e.printStackTrace(); }
  138.                 reading_currently = true;
  139.             }
  140.         }
  141.     }
  142.  
  143.     public void ack(Object msgId) {
  144.         ack_num++;
  145.     }
  146. }
  147.  
  148.  
  149.  
  150.  
  151.  
  152. //BoltSplitter.java
  153. import backtype.storm.task.OutputCollector;
  154. import backtype.storm.task.TopologyContext;
  155. import backtype.storm.topology.OutputFieldsDeclarer;
  156. import backtype.storm.topology.base.BaseRichBolt;
  157. import backtype.storm.tuple.Fields;
  158. import backtype.storm.tuple.Tuple;
  159. import com.google.common.collect.Lists;
  160.  
  161. import java.util.Map;
  162.  
  163. public class BoltSplitter extends BaseRichBolt {
  164.     private OutputCollector collector;
  165.  
  166.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  167.         this.collector = collector;
  168.     }
  169.  
  170.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  171.         declarer.declare(new Fields("word"));
  172.     }
  173.  
  174.     public void execute(Tuple input) {
  175.         String words[] = input.getStringByField("words").split("[^a-zA-Zа-яА-Я]+");
  176.         for (String w : words){
  177.             collector.emit(input, Lists.newArrayList((Object) w));
  178.         }
  179.         collector.ack(input);
  180.     }
  181. }
  182.  
  183.  
  184.  
  185.  
  186.  
  187. //BoltCounter.java
  188. import backtype.storm.task.OutputCollector;
  189. import backtype.storm.task.TopologyContext;
  190. import backtype.storm.topology.OutputFieldsDeclarer;
  191. import backtype.storm.topology.base.BaseRichBolt;
  192. import backtype.storm.tuple.Tuple;
  193. import java.util.HashMap;
  194. import java.util.Map;
  195.  
  196. public class BoltCounter extends BaseRichBolt {
  197.     private OutputCollector output;
  198.     private Map<String, Integer> table;
  199.  
  200.     public BoltCounter(){}
  201.  
  202.     public void declareOutputFields(OutputFieldsDeclarer declarer) {}
  203.  
  204.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  205.         output = collector;
  206.         table = new HashMap<String, Integer>();
  207.     }
  208.  
  209.     public void execute(Tuple tuple) {
  210.         try {
  211.             if (tuple.getSourceStreamId().equals("sync")) {
  212.  
  213.                 for (Map.Entry<String, Integer> s : table.entrySet()) {
  214.                     System.out.println(s.getKey() + " : " + s.getValue());
  215.                 }
  216.                 table = new HashMap<String, Integer>();
  217.  
  218.             } else {
  219.  
  220.                 String word = tuple.getStringByField("word");
  221.                 Integer counts = table.get(word);
  222.  
  223.                 if (null == counts)
  224.                     counts = 0;
  225.  
  226.                 counts++;
  227.                 table.put(word, counts);
  228.                 output.ack(tuple);
  229.  
  230.             }
  231.         } catch (Exception e) { e.printStackTrace(); }
  232.     }
  233. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement