Advertisement
Ladies_Man

#HADOOP Lab3 (Sort + Join) COMPLETE

Nov 14th, 2015
240
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 11.28 KB | None | 0 0
  1. //hadoop lab3 (sort + join)
  2. //задание:
  3. //Требуется связать наборы данных по коду аэропорта прибытия: DEST_AEROPORT_ID
  4. //Для каждого аэропорта требуется определить среднее, минимальное и максимальное время задержки для всех прибывающих рейсов.
  5.  
  6.  
  7.  
  8.  
  9. //Запуск:
  10. //hadoop fs -rmr output
  11. //mvn package
  12. //export HADOOP_CLASSPATH=target/hadoop-examples-1.0-SNAPSHOT.jar
  13. //hadoop FlightSort 664600583_T_ONTIME_sample.csv L_AIRPORT_ID.csv output
  14. //hadoop fs -copyToLocal output
  15.  
  16.  
  17.  
  18.  
  19.  
  20. //FlightSort.java
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  23. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  25. import org.apache.hadoop.mapreduce.Job;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.io.IntWritable;
  29.  
  30. public class FlightSort {
  31.     public static void main(String[] args) throws Exception {
  32.         if (args.length != 3) {
  33.             System.err.println("Usage: FlightSort-and-JoinApp <input path flight> <input path airport> <output path>");
  34.             System.exit(-1);
  35.         }
  36.  
  37.         Job job = Job.getInstance();
  38.  
  39.         job.setJarByClass(FlightSort.class);
  40.         job.setJobName("Flight Sort-and-Join Job (hadlab3)");
  41.  
  42.         MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, FlightMapper.class);
  43.         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AirportMapper.class);
  44.  
  45.         FileOutputFormat.setOutputPath(job, new Path(args[2]));
  46.  
  47.         job.setPartitionerClass(FlightPartitioner.class);
  48.         job.setGroupingComparatorClass(FlightComparator.class);
  49.         job.setReducerClass(FlightJoinReducer.class);
  50.         job.setMapOutputKeyClass(FlightWritableComparable.class);
  51.         job.setMapOutputValueClass(Text.class);
  52.  
  53.         //Mapper<.., .., KEYOUT, VALUEOUT>
  54.         job.setOutputKeyClass(FlightWritableComparable.class);
  55.         job.setOutputValueClass(Text.class);
  56.  
  57.         job.setNumReduceTasks(2);
  58.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  59.     }
  60. }
  61.  
  62.  
  63.  
  64.  
  65.  
  66. //AirportMapper.java
  67. import org.apache.hadoop.io.LongWritable;
  68. import org.apache.hadoop.io.Text;
  69. import org.apache.hadoop.mapreduce.Mapper;
  70.  
  71. import java.io.IOException;
  72.  
  73. public class AirportMapper extends Mapper<LongWritable, Text, FlightWritableComparable, Text> {
  74.  
  75.     @Override
  76.     protected void map(LongWritable key, Text value, Context context)
  77.             throws IOException, InterruptedException {
  78.  
  79.         String[] columns = value.toString().replace("\"", "").split(",(?! )");
  80.  
  81.         if (!columns[0].equals("") &&
  82.                 !columns[0].equals("Code") &&
  83.                 !columns[1].equals("")) {
  84.  
  85.  
  86.  
  87.             FlightWritableComparable entry_writcom = new FlightWritableComparable();
  88.             entry_writcom.setFlag(0);
  89.             entry_writcom.setDest_airport_id(Integer.parseInt(columns[0]));
  90.  
  91.             Text airportname_text = new Text(columns[1]);
  92.  
  93.             context.write(entry_writcom, airportname_text);
  94.  
  95.         }
  96.  
  97.     }
  98. }
  99.  
  100.  
  101.  
  102.  
  103.  
  104. //FlightMapper.java
  105. import org.apache.hadoop.io.IntWritable;
  106. import org.apache.hadoop.io.LongWritable;
  107. import org.apache.hadoop.io.Text;
  108. import org.apache.hadoop.mapreduce.Mapper;
  109.  
  110. import java.io.IOException;
  111.  
  112. //Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  113. public class FlightMapper extends Mapper<LongWritable, Text, FlightWritableComparable, Text> {
  114.  
  115.     @Override
  116.     protected void map(LongWritable key, Text value, Context context)
  117.             throws IOException, InterruptedException {
  118.  
  119.         String[] columns = value.toString().split(",");
  120.  
  121.         //flight
  122.         if (!columns[0].equals("\"YEAR\"") && !columns[19].equals("")) {
  123.  
  124.             //not cancelled
  125.             if ((float) 0 == Float.parseFloat(columns[19])) {
  126.  
  127.                 //delayed
  128.                 if (!columns[18].equals("") &&
  129.                         ((float)0 < Float.parseFloat(columns[18])) &&
  130.                         !columns[14].equals("")) {
  131.  
  132.  
  133.  
  134.                     FlightWritableComparable entry_writcom = new FlightWritableComparable();
  135.                     entry_writcom.setFlag(1);
  136.                     entry_writcom.setDest_airport_id(Integer.parseInt(columns[14]));
  137.  
  138.                     Text delay_text = new Text(columns[18]);
  139.  
  140.                     context.write(entry_writcom, delay_text);
  141.  
  142.                 }
  143.             }
  144.         }
  145.  
  146.     }
  147. }
  148.  
  149.  
  150.  
  151.  
  152.  
  153.  
  154. //FlightWritableComparable.java
  155. import org.apache.hadoop.io.WritableComparable;
  156.  
  157. import java.io.DataInput;
  158. import java.io.DataOutput;
  159. import java.io.IOException;
  160.  
  161. public class FlightWritableComparable implements WritableComparable {
  162.     private int flag;
  163.     private int dest_airport_id;
  164.  
  165.     public void setFlag(int flag) {
  166.         this.flag = flag;
  167.     }
  168.  
  169.     public void setDest_airport_id(int dest_airport_id) {
  170.         this.dest_airport_id = dest_airport_id;
  171.     }
  172.  
  173.     public int getDest_airport_id() {
  174.         return dest_airport_id;
  175.     }
  176.  
  177.     @Override
  178.     public int compareTo(Object o) {
  179.  
  180.         FlightWritableComparable that = (FlightWritableComparable) o;
  181.  
  182.         int that_flag = that.flag;
  183.         int that_dest_airport_id = that.dest_airport_id;
  184.  
  185.         if (this.dest_airport_id > that_dest_airport_id) {
  186.             return 1;
  187.         }
  188.         if (this.dest_airport_id < that_dest_airport_id) {
  189.             return -1;
  190.         }
  191.  
  192.         if (this.flag > that_flag) {
  193.             return 1;
  194.         }
  195.         if (this.flag < that_flag) {
  196.             return -1;
  197.         }
  198.  
  199.         return 0;
  200.  
  201.     }
  202.  
  203.     @Override
  204.     public void write(DataOutput dataOutput) throws IOException {
  205.         dataOutput.writeInt(flag);
  206.         dataOutput.writeInt(dest_airport_id);
  207.     }
  208.  
  209.     @Override
  210.     public void readFields(DataInput dataInput) throws IOException {
  211.         flag = dataInput.readInt();
  212.         dest_airport_id = dataInput.readInt();
  213.     }
  214.  
  215.     @Override
  216.     public int hashCode() {
  217.  
  218.         int hash = this.toString().hashCode();
  219.  
  220.         if (hash < 0) {
  221.             return -hash;
  222.         }
  223.  
  224.         return  hash;
  225.     }
  226.  
  227.     @Override
  228.     public String toString() {
  229.         return "WritComp{" +
  230.                 "flag=" + flag +
  231.                 ", dest_airport_id=" + dest_airport_id +
  232.                 '}';
  233.     }
  234.  
  235.     public boolean equals(Object o) {
  236.         if (this == o) return true;
  237.         if (o == null || getClass() != o.getClass()) return false;
  238.  
  239.         FlightWritableComparable that = (FlightWritableComparable) o;
  240.  
  241.         return flag == that.flag && dest_airport_id == that.dest_airport_id;
  242.  
  243.     }
  244. }
  245.  
  246.  
  247.  
  248.  
  249.  
  250.  
  251. //FlightPartitioner.java
  252. import org.apache.hadoop.io.Text;
  253. import org.apache.hadoop.mapreduce.Partitioner;
  254.  
  255. public class FlightPartitioner extends Partitioner<FlightWritableComparable, Text> {
  256.  
  257.     @Override
  258.     public int getPartition(FlightWritableComparable key, Text value, int numPartitions) {
  259.         return key.getDest_airport_id() % numPartitions;
  260.     }
  261. }
  262.  
  263.  
  264.  
  265.  
  266.  
  267. //FlightComparator.java
  268. import org.apache.hadoop.io.WritableComparable;
  269. import org.apache.hadoop.io.WritableComparator;
  270.  
  271. public class FlightComparator extends WritableComparator {
  272.  
  273.     public FlightComparator() {
  274.         super(FlightWritableComparable.class, true);
  275.     }
  276.  
  277.     @Override
  278.     public int compare(WritableComparable a1, WritableComparable b1) {
  279.  
  280.         FlightWritableComparable a = (FlightWritableComparable) a1;
  281.         FlightWritableComparable b = (FlightWritableComparable) b1;
  282.  
  283.         return Integer.compare(a.getDest_airport_id(), b.getDest_airport_id());
  284.     }
  285. }
  286.  
  287.  
  288.  
  289.  
  290.  
  291.  
  292. //FlightReducer.java
  293. import org.apache.hadoop.io.IntWritable;
  294. import org.apache.hadoop.io.LongWritable;
  295. import org.apache.hadoop.io.Text;
  296. import org.apache.hadoop.mapreduce.Reducer;
  297.  
  298. import java.io.IOException;
  299. import java.util.Iterator;
  300. import java.util.regex.Matcher;
  301. import java.util.regex.Pattern;
  302.  
  303. //Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  304. public class FlightJoinReducer extends Reducer<FlightWritableComparable, Text, String, String> {
  305.  
  306.     @Override
  307.     protected void reduce(FlightWritableComparable key, Iterable<Text> values, Context context)
  308.             throws IOException, InterruptedException {
  309.  
  310.         String airport_name = "AirPort:";
  311.         String airport_stats = "\tDelays:";
  312.         //String tmp = "";
  313.  
  314.         Iterator<Text> val_iterator = values.iterator();
  315.  
  316.         airport_name += val_iterator.next().toString();
  317.  
  318.         if (val_iterator.hasNext()) {
  319.  
  320.             int i = 0;
  321.             float tmp = Float.parseFloat(val_iterator.next().toString());
  322.             float sum = (float) 0, min = tmp, max = tmp;
  323.  
  324.             while (val_iterator.hasNext()) {
  325.  
  326.                 float curr_val = Float.parseFloat(val_iterator.next().toString());
  327.  
  328.                 if (curr_val > max) {
  329.                     max = curr_val;
  330.                 }
  331.                 if (curr_val < min) {
  332.                     min = curr_val;
  333.                 }
  334.                 sum += curr_val;
  335.                 i++;
  336.  
  337.             }
  338.  
  339.             airport_stats += "Min:" + min;
  340.             airport_stats += ", Max:" + max;
  341.             airport_stats += ", Avg:" + (sum / (float)i);
  342.  
  343.             context.write(airport_name, airport_stats);
  344.  
  345.         }
  346.     }
  347. }
  348.  
  349.  
  350.  
  351.  
  352.  
  353.  
  354. //Example:
  355. AirPort:Abilene, TX: Abilene Regional       Delays:Min:1.0, Max:76.0, Avg:19.666666
  356. AirPort:Albuquerque, NM: Albuquerque International Sunport      Delays:Min:1.0, Max:126.0, Avg:19.852942
  357. AirPort:Albany, GA: Southwest Georgia Regional      Delays:Min:12.0, Max:34.0, Avg:21.0
  358. AirPort:Atlantic City, NJ: Atlantic City International      Delays:Min:2.0, Max:147.0, Avg:58.153847
  359. AirPort:Kodiak, AK: Kodiak Airport      Delays:Min:54.0, Max:76.0, Avg:76.0
  360. AirPort:Augusta, GA: Augusta Regional at Bush Field     Delays:Min:1.0, Max:246.0, Avg:50.8
  361. AirPort:Waterloo, IA: Waterloo Regional     Delays:Min:2.0, Max:223.0, Avg:60.0
  362. AirPort:Aspen, CO: Aspen Pitkin County Sardy Field      Delays:Min:5.0, Max:136.0, Avg:36.264706
  363. AirPort:Appleton, WI: Outagamie County Regional     Delays:Min:3.0, Max:146.0, Avg:54.333332
  364. AirPort:Scranton/Wilkes-Barre, PA: Wilkes Barre Scranton International      Delays:Min:1.0, Max:20.0, Avg:10.666667
  365. AirPort:Billings, MT: Billings Logan International      Delays:Min:5.0, Max:58.0, Avg:31.142857
  366. AirPort:Bellingham, WA: Bellingham International        Delays:Min:3.0, Max:20.0, Avg:3.0
  367. AirPort:Beaumont/Port Arthur, TX: Jack Brooks Regional      Delays:Min:14.0, Max:36.0, Avg:21.0
  368. AirPort:Aguadilla, PR: Rafael Hernandez     Delays:Min:3.0, Max:109.0, Avg:7.0
  369. AirPort:Barrow, AK: Wiley Post/Will Rogers Memorial     Delays:Min:2.0, Max:306.0, Avg:67.333336
  370. AirPort:Buffalo, NY: Buffalo Niagara International      Delays:Min:1.0, Max:128.0, Avg:23.833334
  371. AirPort:Burbank, CA: Bob Hope       Delays:Min:1.0, Max:192.0, Avg:25.863636
  372. AirPort:Columbia, SC: Columbia Metropolitan     Delays:Min:1.0, Max:119.0, Avg:30.185184
  373. AirPort:Akron, OH: Akron-Canton Regional        Delays:Min:1.0, Max:83.0, Avg:20.6
  374. AirPort:Cedar City, UT: Cedar City Regional     Delays:Min:1.0, Max:15.0, Avg:1.0
  375. AirPort:Cordova, AK: Merle K Mudhole Smith      Delays:Min:1.0, Max:151.0, Avg:1.0
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement