Advertisement
Ladies_Man

#HADOOP Lab2 (Flight Sort App) COMPLETE

Oct 31st, 2015
146
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 11.89 KB | None | 0 0
  1. //hadoop lab2
  2. //задание:
  3. //а.Требуется отфильтровать рейсы которые были отменены или прошли с опозданием
  4. //б. Данные, прошедшие фильтр, требуется отсортировать по времени опоздания, аэропорту назначения и времени перелета
  5.  
  6.  
  7. //запуск:
  8. //hadoop fs -rmr output
  9. //mvn package
  10. //export HADOOP_CLASSPATH=target/hadoop-examples-1.0-SNAPSHOT.jar
  11. //hadoop FlightSort 664600583_T_ONTIME_sample.csv output
  12. //hadoop fs -copyToLocal output
  13.  
  14.  
  15.  
  16. //FlightSort.java
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.fs.Path;
  21. import org.apache.hadoop.io.Text;
  22. import org.apache.hadoop.io.IntWritable;
  23.  
  24. public class FlightSort {
  25.     public static void main(String[] args) throws Exception {
  26.         if (args.length != 2) {
  27.             System.err.println("Usage: FlightSortApp <input path> <output path>");
  28.             System.exit(-1);
  29.         }
  30.  
  31.         Job job = Job.getInstance();
  32.  
  33.         job.setJarByClass(FlightSort.class);
  34.         job.setJobName("Flight Sort Job (hadlab2)");
  35.  
  36.         FileInputFormat.addInputPath(job, new Path(args[0]));
  37.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  38.  
  39.         job.setMapperClass(FlightMapper.class);
  40.         job.setPartitionerClass(FlightPartitioner.class);
  41.         job.setReducerClass(FlightReducer.class);
  42.         job.setGroupingComparatorClass(FlightComparator.class);
  43.  
  44.         //Mapper<.., .., KEYOUT, VALUEOUT>
  45.         job.setOutputKeyClass(FlightWritableComparable.class);
  46.         job.setOutputValueClass(Text.class);
  47.  
  48.         job.setNumReduceTasks(2);
  49.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  50.     }
  51. }
  52.  
  53.  
  54.  
  55. //FlightMapper.java
  56. import org.apache.hadoop.io.IntWritable;
  57. import org.apache.hadoop.io.LongWritable;
  58. import org.apache.hadoop.io.Text;
  59. import org.apache.hadoop.mapreduce.Mapper;
  60.  
  61. import java.io.IOException;
  62.  
  63. //Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  64. public class FlightMapper extends Mapper<LongWritable, Text, FlightWritableComparable, Text> {
  65.  
  66.     @Override
  67.     protected void map(LongWritable key, Text value, Context context)
  68.             throws IOException, InterruptedException {
  69.  
  70.         String[] columns = value.toString().split(",");
  71.  
  72.         //flight (not metadata)
  73.         if (!columns[0].equals("\"YEAR\"") && !columns[19].equals("")) {
  74.  
  75.             //not cancelled
  76.             if ((float)0 == Float.parseFloat(columns[19])) {
  77.  
  78.                 //delayed
  79.                 if (!columns[18].equals("") && ((float)0 != Float.parseFloat(columns[18])) &&
  80.                         !columns[14].equals("") &&
  81.                         !columns[21].equals("")) {
  82.  
  83.                     FlightWritableComparable entry = new FlightWritableComparable();
  84.  
  85.                     entry.setArr_delay(Float.parseFloat(columns[18]));
  86.                     entry.setDest_airport_id(Integer.parseInt(columns[14]));
  87.                     entry.setAir_time(Float.parseFloat(columns[21]));
  88.                     entry.setCancelled((float)0);
  89.  
  90.                     context.write(entry, value);
  91.  
  92.                 }
  93.  
  94.             //cancelled
  95.             } else {
  96.  
  97.                 //so it has no ArrDelay, no AirTime, no DestAirport, only OriginAriport
  98.                 if (!columns[11].equals("")) {
  99.  
  100.                     FlightWritableComparable entry = new FlightWritableComparable();
  101.  
  102.                     entry.setArr_delay((float)0);
  103.                     entry.setDest_airport_id(Integer.parseInt(columns[11]));
  104.                     entry.setAir_time((float)0);
  105.                     entry.setCancelled((float)1);
  106.  
  107.                     context.write(entry, value);
  108.  
  109.                 }
  110.  
  111.             }
  112.  
  113.         }
  114.  
  115.     }
  116. }
  117.  
  118.  
  119.  
  120.  
  121. //FlightReducer.java
  122. import org.apache.hadoop.io.IntWritable;
  123. import org.apache.hadoop.io.LongWritable;
  124. import org.apache.hadoop.io.Text;
  125. import org.apache.hadoop.mapreduce.Reducer;
  126.  
  127. import java.io.IOException;
  128. import java.util.Iterator;
  129.  
  130. //Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  131. public class FlightReducer extends Reducer<FlightWritableComparable, Text, String, Text> {
  132.     @Override
  133.     protected void reduce(FlightWritableComparable key, Iterable<Text> values, Context context)
  134.             throws IOException, InterruptedException {
  135.  
  136.         for (Text t : values){
  137.  
  138.             String brief_info = "Flight: Brief: [" + key.getBriefInfo() + "]";
  139.  
  140.             String tmp = t.toString();
  141.             Text full_info = new Text("Full: [" + tmp + "]");
  142.  
  143.             context.write(brief_info, full_info);
  144.         }
  145.  
  146.     }
  147. }
  148.  
  149.  
  150.  
  151.  
  152. //FlightWritableComparable.java
  153. import org.apache.hadoop.io.WritableComparable;
  154.  
  155. import java.io.DataInput;
  156. import java.io.DataOutput;
  157. import java.io.IOException;
  158.  
  159. public class FlightWritableComparable implements WritableComparable {
  160.  
  161.     private float arr_delay;
  162.     private int dest_airport_id;
  163.     private float air_time;
  164.     private float cancelled;
  165.  
  166.     public void setArr_delay(float arr_delay) {
  167.         this.arr_delay = arr_delay;
  168.     }
  169.  
  170.     public void setDest_airport_id(int dest_airport_id) {
  171.         this.dest_airport_id = dest_airport_id;
  172.     }
  173.  
  174.     public void setAir_time(float air_time) {
  175.         this.air_time = air_time;
  176.     }
  177.  
  178.     public void setCancelled(float cancelled) {
  179.         this.cancelled = cancelled;
  180.     }
  181.  
  182.     //for comparator's compare
  183.     public int compareDelayTime(Object o) {
  184.         FlightWritableComparable that = (FlightWritableComparable) o;
  185.  
  186.         float that_arr_delay = that.arr_delay;
  187.  
  188.         if (this.arr_delay > that_arr_delay) {
  189.             return 1;
  190.         }
  191.         if (this.arr_delay < that_arr_delay) {
  192.             return -1;
  193.         }
  194.  
  195.         return 0;
  196.     }
  197.  
  198.     @Override
  199.     public int compareTo(Object o) {
  200.  
  201.         FlightWritableComparable that = (FlightWritableComparable) o;
  202.  
  203.         float that_arr_delay = that.arr_delay;
  204.         int that_dest_airport_id = that.dest_airport_id;
  205.         float that_air_time = that.air_time;
  206.         float that_cancelled = that.cancelled;
  207.  
  208.  
  209.         if (this.cancelled > that_cancelled) {
  210.             return 1;
  211.         }
  212.         if (this.cancelled < that_cancelled) {
  213.             return -1;
  214.         }
  215.  
  216.         if (this.arr_delay > that_arr_delay) {
  217.             return 1;
  218.         }
  219.         if (this.arr_delay < that_arr_delay) {
  220.             return -1;
  221.         }
  222.  
  223.         if (this.dest_airport_id > that_dest_airport_id) {
  224.             return 1;
  225.         }
  226.         if (this.dest_airport_id < that_dest_airport_id) {
  227.             return -1;
  228.         }
  229.  
  230.         if (this.air_time > that_air_time) {
  231.             return 1;
  232.         }
  233.         if (this.air_time < that_air_time) {
  234.             return -1;
  235.         }
  236.  
  237.         return 0;
  238.  
  239.     }
  240.  
  241.     @Override
  242.     public void write(DataOutput dataOutput) throws IOException {
  243.         dataOutput.writeFloat(arr_delay);
  244.         dataOutput.writeInt(dest_airport_id);
  245.         dataOutput.writeFloat(air_time);
  246.         dataOutput.writeFloat(cancelled);
  247.     }
  248.  
  249.     @Override
  250.     public void readFields(DataInput dataInput) throws IOException {
  251.         arr_delay = dataInput.readFloat();
  252.         dest_airport_id = dataInput.readInt();
  253.         air_time = dataInput.readFloat();
  254.         cancelled = dataInput.readFloat();
  255.     }
  256.  
  257.     @Override
  258.     public int hashCode() {
  259.  
  260.         int hash = this.toString().hashCode();
  261.  
  262.         if (hash < 0) {
  263.             return -hash;
  264.         }
  265.  
  266.         return  hash;
  267.     }
  268.  
  269.     @Override
  270.     public boolean equals(Object o) {
  271.         if (this == o) return true;
  272.         if (o == null || getClass() != o.getClass()) return false;
  273.  
  274.         FlightWritableComparable that = (FlightWritableComparable) o;
  275.  
  276.         if (Float.compare(that.arr_delay, arr_delay) != 0) return false;
  277.         if (dest_airport_id != that.dest_airport_id) return false;
  278.         if (Float.compare(that.air_time, air_time) != 0) return false;
  279.         return Float.compare(that.cancelled, cancelled) == 0;
  280.  
  281.     }
  282.  
  283.     //just in case
  284.     public String getBriefInfo() {
  285.         return "Delay:" + arr_delay +
  286.                 ", AirportID:" + dest_airport_id +
  287.                 ", AirTime:" + air_time +
  288.                 ", Cancelled:" + cancelled;
  289.     }
  290.  
  291.     @Override
  292.     public String toString() {
  293.         return "FlightWritableComparable{" +
  294.                 "arr_delay=" + arr_delay +
  295.                 ", dest_airport_id=" + dest_airport_id +
  296.                 ", air_time=" + air_time +
  297.                 ", cancelled=" + cancelled +
  298.                 '}';
  299.     }
  300. }
  301.  
  302.  
  303.  
  304.  
  305.  
  306. //FlightComparator.java
  307. import org.apache.hadoop.io.WritableComparable;
  308. import org.apache.hadoop.io.WritableComparator;
  309.  
  310. public class FlightComparator extends WritableComparator {
  311.  
  312.     public FlightComparator() {
  313.         super(FlightWritableComparable.class, true);
  314.     }
  315.  
  316.     @Override
  317.     public int compare(WritableComparable a1, WritableComparable b1) {
  318.         FlightWritableComparable a = (FlightWritableComparable) a1;
  319.         FlightWritableComparable b = (FlightWritableComparable) b1;
  320.  
  321.         return a.compareDelayTime(b);
  322.     }
  323. }
  324.  
  325.  
  326.  
  327.  
  328. //FlightPartitioner.java
  329. import org.apache.hadoop.io.Text;
  330. import org.apache.hadoop.mapreduce.Partitioner;
  331.  
  332. public class FlightPartitioner extends Partitioner<FlightWritableComparable, Text> {
  333.  
  334.     @Override
  335.     public int getPartition(FlightWritableComparable key, Text value, int numPartitions) {
  336.         return key.hashCode() % numPartitions;
  337.     }
  338. }
  339.  
  340.  
  341.  
  342.  
  343.  
  344. //output example:
  345. Flight: Brief: [Delay:1.0, AirportID:10140, AirTime:78.0, Cancelled:0.0]    Full: [2015,1,1,10,6,2015-01-10,"OO",20304,"OO","N170PQ","4499",14869,1486903,34614,10140,"1844","1853",1.00,1.00,0.00,"",78.00,493.00,]
  346. Flight: Brief: [Delay:1.0, AirportID:10140, AirTime:85.0, Cancelled:0.0]    Full: [2015,1,1,25,7,2015-01-25,"WN",19393,"WN","N7739A","4285",14679,1467903,33570,10140,"2317","2321",1.00,1.00,0.00,"",85.00,628.00,]
  347. Flight: Brief: [Delay:1.0, AirportID:10140, AirTime:153.0, Cancelled:0.0]   Full: [2015,1,1,10,6,2015-01-10,"WN",19393,"WN","N224WN","2739",14747,1474703,30559,10140,"1530","1541",1.00,1.00,0.00,"",153.00,1180.00,]
  348. Flight: Brief: [Delay:1.0, AirportID:10279, AirTime:80.0, Cancelled:0.0]    Full: [2015,1,1,5,1,2015-01-05,"EV",20366,"EV","N14514","5792",12266,1226603,31453,10279,"1607","1613",1.00,1.00,0.00,"",80.00,517.00,]
  349. Flight: Brief: [Delay:1.0, AirportID:10299, AirTime:198.0, Cancelled:0.0]   Full: [2015,1,1,13,2,2015-01-13,"AS",19930,"AS","N303AS","85",14747,1474703,30559,10299,"1307","1312",1.00,1.00,0.00,"",198.00,1448.00,]
  350. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:45.0, Cancelled:0.0]    Full: [2015,1,1,23,5,2015-01-23,"DL",19790,"DL","N978DL","1956",11481,1148102,31481,10397,"1915","1927",1.00,1.00,0.00,"",45.00,240.00,]
  351. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:47.0, Cancelled:0.0]    Full: [2015,1,1,12,1,2015-01-12,"DL",19790,"DL","N986DL","2422",11057,1105703,31057,10397,"1044","1054",1.00,1.00,0.00,"",47.00,226.00,]
  352. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:56.0, Cancelled:0.0]    Full: [2015,1,1,30,5,2015-01-30,"EV",20366,"EV","N861AS","4962",14574,1457402,34574,10397,"1524","1530",1.00,1.00,0.00,"",56.00,357.00,]
  353. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:63.0, Cancelled:0.0]    Full: [2015,1,1,4,7,2015-01-04,"WN",19393,"WN","N947WN","888",13495,1349503,33495,10397,"0853","0901",1.00,1.00,0.00,"",63.00,425.00,]
  354. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:69.0, Cancelled:0.0]    Full: [2015,1,1,9,5,2015-01-09,"DL",19790,"DL","N357NW","755",11066,1106603,31066,10397,"1636","1641",1.00,1.00,0.00,"",69.00,447.00,]
  355. Flight: Brief: [Delay:1.0, AirportID:10397, AirTime:70.0, Cancelled:0.0]    Full: [2015,1,1,16,5,2015-01-16,"DL",19790,"DL","N907DE","2313",14122,1412202,30198,10397,"2116","2122",1.00,1.00,0.00,"",70.00,526.00,]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement