Advertisement
Ladies_Man

#HADOOP Lab8 (Trident) COMPLETE

Dec 26th, 2015
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.28 KB | None | 0 0
  1. //hadlab8: Trident Storm
  2. Лабораторная работа 8. Разработка простой топологии trident.
  3.  
  4. Задача:
  5. -Требуется разработать топологию для анализа данных перелетов — получить
  6.     распределение количества опоздавших или отмененных рейсов по дням недели.
  7. -а.Требуется разработать Spout который будет опрашивать директорию и в
  8.     случае обнаружения в ней новых файлов читать ее построчно и генерировать
  9.     batch, cостоящий из всех строк файла. После окончания файла требуется
  10.     перенести файл в папку для обработанных файлов
  11. -б. Разрабатываем Function которая будет принимать строку и разбивать ее на
  12.     tuple состоящий из следующих данных 664600583_T_ONTIME_sample.csv :
  13.     день недели, время опоздания, признак совершенного рейса
  14. -в. Разрабатываем фильтр отбрасывающий записи рейсов которые прилетели
  15.     вовремя
  16. -г. Разрабатываем CombinerAggregator который принимает tuple с данными
  17.     совершенного рейса и подсчитывает аггрегированные данные <день
  18.     недели>:<количество рейсов>
  19. -д. Разрабатываем фильтр который будет принимать результат с данными
  20.     CombinerAggregator и печатать их в консоль
  21.  
  22.  
  23.  
  24. //запуск:
  25. //mnv package
  26. //mvn exec:java -Dexec.mainClass="TridentLab"
  27.  
  28.  
  29.  
  30. //TridentLab.java
  31. import backtype.storm.Config;
  32. import backtype.storm.LocalCluster;
  33. import backtype.storm.topology.TopologyBuilder;
  34. import backtype.storm.tuple.Fields;
  35. import storm.trident.TridentTopology;
  36.  
  37. public class TridentLab {
  38.  
  39.     public static void main(String[] args) {
  40.  
  41.         TridentTopology topology = new TridentTopology();
  42.  
  43.         topology.newStream("generator", new BatchSpout())
  44.                 .parallelismHint(1)
  45.                 .shuffle()
  46.                 .each(new Fields("sentence"), new SplitFunction(), new Fields("day", "delay", "cancellation"))
  47.                 .each(new Fields("delay", "cancellation"), new DelayFilter())
  48.                 .groupBy(new Fields("day"))
  49.                 .aggregate(new Fields("day"), new DayAggregator(), new Fields("count"))
  50.                 .parallelismHint(7)
  51.                 .each(new Fields("day", "count"), new PrinterFunction(), new Fields());
  52.  
  53.         Config config = new Config();
  54.  
  55.         LocalCluster cluster = new LocalCluster();
  56.         cluster.submitTopology("poll", config, topology.build());
  57.     }
  58.  
  59. }
  60.  
  61.  
  62.  
  63.  
  64. //BatchSpout.java
  65. import backtype.storm.task.TopologyContext;
  66. import backtype.storm.tuple.Fields;
  67. import backtype.storm.tuple.Values;
  68. import com.google.common.base.Charsets;
  69. import com.google.common.io.Files;
  70. import storm.trident.operation.TridentCollector;
  71. import storm.trident.spout.IBatchSpout;
  72.  
  73. import java.io.*;
  74. import java.lang.Override;
  75. import java.util.Map;
  76.  
  77. public class BatchSpout implements IBatchSpout {
  78.     private TopologyContext context;
  79.     private File dir;
  80.  
  81.     public  BatchSpout() {
  82.  
  83. //2 directories should exist: in_files, out_files
  84. //in_files must contain 664600583_T_ONTIME_sample.csv
  85.  
  86.         dir = new File("/home/anthony/hadlab8/in_files");
  87.     }
  88.  
  89.     public Fields getOutputFields() {
  90.         return new Fields("sentence");
  91.     }
  92.  
  93.     public void open(Map map, TopologyContext topologyContext) {
  94.         context = topologyContext;
  95.     }
  96.  
  97.     public void emitBatch(long l, TridentCollector tridentCollector) {
  98.  
  99.         File files[] = dir.listFiles();
  100.  
  101.         if (null != files && 0 != files.length) {
  102.  
  103.             File curr_file = files[0];
  104.             try {
  105.                 BufferedReader reader = new BufferedReader(new InputStreamReader(
  106.                         new FileInputStream(curr_file), Charsets.UTF_8));
  107.  
  108.                 while (true) {
  109.  
  110.                     String line = reader.readLine();
  111.                     if (null == line) {
  112.                         break;
  113.                     }
  114.  
  115.                     tridentCollector.emit(new Values(line));
  116.                 }
  117.  
  118.                 reader.close();
  119.                 Files.move(curr_file, new File("/home/anthony/hadlab8/out_files/" + curr_file.getName()));
  120.  
  121.             } catch (FileNotFoundException e1) { e1.printStackTrace(); }
  122.               catch (IOException e2) { e2.printStackTrace(); }
  123.  
  124.         }
  125.     }
  126.  
  127.     public void ack(long l) {}
  128.  
  129.     public void close() {}
  130.  
  131.     public Map getComponentConfiguration() { return null; }
  132.  
  133. }
  134.  
  135.  
  136.  
  137.  
  138. //SplitFunction.java
  139. import backtype.storm.tuple.Values;
  140. import storm.trident.operation.BaseFunction;
  141. import storm.trident.operation.TridentCollector;
  142. import storm.trident.tuple.TridentTuple;
  143.  
  144. public class SplitFunction extends BaseFunction {
  145.     private static final int DAY_OF_WEEK = 4;
  146.     private static final int ARR_DELAY_NEW = 18;
  147.     private static final int CANCELLED = 19;
  148.  
  149.     public void execute(TridentTuple tuple, TridentCollector collector) {
  150.         String[] columns = tuple.getString(0).split(",");
  151.         if (!"\"YEAR\"".equals(columns[0])) {
  152.  
  153.             collector.emit(new Values(
  154.                     columns[DAY_OF_WEEK],
  155.                     columns[ARR_DELAY_NEW],
  156.                     columns[CANCELLED]));
  157.         }
  158.     }
  159.  
  160. }
  161.  
  162.  
  163.  
  164.  
  165. //DelayFilter.java
  166. import storm.trident.operation.BaseFilter;
  167. import storm.trident.tuple.TridentTuple;
  168.  
  169. public class DelayFilter extends BaseFilter {
  170.     private static final float ON_TIME = (float)0;
  171.  
  172.     public boolean isKeep(TridentTuple tuple) {
  173.         String delay = tuple.getString(0);
  174.         String cancellation = tuple.getString(1);
  175.  
  176.         if (null != cancellation && !cancellation.equals("") && cancellation.equals("1.00")) {
  177.             return true;
  178.         } else {
  179.             if (null != delay && !delay.equals("") && ON_TIME <= Float.parseFloat(delay)) {
  180.                 return true;
  181.             }
  182.         }
  183.         return false;
  184.     }
  185. }
  186.  
  187.  
  188.  
  189.  
  190. //DayAggregator.java
  191. import storm.trident.operation.CombinerAggregator;
  192. import storm.trident.tuple.TridentTuple;
  193.  
  194. public class DayAggregator implements CombinerAggregator<Long> {
  195.  
  196.     public Long init(TridentTuple tridentTuple) {
  197.         return 1L;
  198.     }
  199.  
  200.     public Long combine(Long a, Long b) {
  201.         return a + b;
  202.     }
  203.  
  204.     public Long zero() {
  205.         return 0L;
  206.     }
  207. }
  208.  
  209.  
  210.  
  211.  
  212. //PrinterFunction.java
  213. import storm.trident.operation.BaseFunction;
  214. import storm.trident.operation.TridentCollector;
  215. import storm.trident.tuple.TridentTuple;
  216.  
  217. public class PrinterFunction extends BaseFunction {
  218.     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
  219.         if (!tridentTuple.toString().equals("")) {
  220.             System.out.println(tridentTuple);
  221.         }
  222.     }
  223. }
  224.  
  225.  
  226.  
  227.  
  228. //output:
  229. [5, 8187]
  230. [1, 6381]
  231. [2, 6090]
  232. [4, 7731]
  233. [3, 6131]
  234. [7, 5988]
  235. [6, 6400]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement