Advertisement
Ladies_Man

#HADOOP Lab5 (Spark) COMPLETE

Nov 14th, 2015
183
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 13.10 KB | None | 0 0
  1. //hadlab5: Spark
  2. //задание:
  3. //Требуется определить для пары <аэропорт отлета, аэропорт прибытия> максимальное время опоздания, процент опоздавших+отмененных рейсов.
  4. //Также требуется связать полученную таблицу с названиями аэропортов.
  5.  
  6.  
  7.  
  8.  
  9. //запуск:
  10. //spark-submit --class SparkLab --master yarn-client --num-executors 3 target/spark-examples-1.0-SNAPSHOT.jar
  11.  
  12.  
  13.  
  14.  
  15. //SparkLab.java
  16. import org.apache.spark.SparkConf;
  17. import org.apache.spark.api.java.JavaPairRDD;
  18. import org.apache.spark.api.java.JavaRDD;
  19. import org.apache.spark.api.java.JavaSparkContext;
  20. import org.apache.spark.api.java.function.FlatMapFunction;
  21. import org.apache.spark.api.java.function.Function;
  22. import org.apache.spark.api.java.function.Function2;
  23. import org.apache.spark.api.java.function.PairFunction;
  24. import org.apache.spark.broadcast.Broadcast;
  25. import scala.Tuple2;
  26.  
  27. import java.io.Serializable;
  28. import java.lang.*;
  29. import java.lang.Float;
  30. import java.lang.Integer;
  31. import java.lang.Object;
  32. import java.util.Arrays;
  33. import java.lang.Boolean;
  34. import java.lang.Exception;
  35. import java.lang.Iterable;
  36. import java.lang.Override;
  37. import java.lang.String;
  38. import java.lang.System;
  39. import java.util.Iterator;
  40. import java.util.Map;
  41. import java.util.regex.Pattern;
  42.  
  43. public class SparkLab {
  44.  
  45.     public static class FlightSerializable implements Serializable {
  46.         //just in case
  47.         public int origin_airport_id;
  48.         public int dest_airport_id;
  49.         //real data
  50.         public float cancelled;
  51.         public float arr_delay_new;
  52.  
  53.         public FlightSerializable(int origin_airport_id, int dest_airport_id, float cancelled, float arr_delay_new) {
  54.             this.origin_airport_id = origin_airport_id;
  55.             this.dest_airport_id = dest_airport_id;
  56.             this.cancelled = cancelled;
  57.             this.arr_delay_new = arr_delay_new;
  58.         }
  59.  
  60.         public FlightSerializable(int origin_airport_id, int dest_airport_id, float cancelled) {
  61.             this.origin_airport_id = origin_airport_id;
  62.             this.dest_airport_id = dest_airport_id;
  63.             this.cancelled = cancelled;
  64.         }
  65.     }
  66.  
  67.     public static void main(String[] args) {
  68.  
  69.         SparkConf conf = new SparkConf().setAppName("sparklab5");
  70.         JavaSparkContext sc = new JavaSparkContext(conf);
  71.  
  72.         JavaRDD<String> flights = sc.textFile("664600583_T_ONTIME_sample.csv");
  73.         JavaRDD<String> airports = sc.textFile("L_AIRPORT_ID.csv");
  74.  
  75.  
  76.  
  77.     //load airport data from csv to rdd of strings
  78.         JavaRDD<String> a_no_meta = airports.map(new Function<String, String>() {
  79.             @Override
  80.             public String call(String line) throws Exception {
  81.                 return line.replace("\"", "");
  82.             }
  83.         }).filter(new Function<String, java.lang.Boolean>() {
  84.             @Override
  85.             public Boolean call(String line2) throws Exception {
  86.                 return !line2.contains("Code") && !line2.contains("Description");
  87.             }
  88.         });
  89.  
  90.  
  91.  
  92.     //generate pairs: <airport_code, description>
  93.         JavaPairRDD<Integer, String> a_paired = a_no_meta.mapToPair(new PairFunction<String, Integer, String>() {
  94.             @Override
  95.             public Tuple2<Integer, String> call(String line) throws Exception {
  96.  
  97.                 String[] columns = line.split(",(?! )");
  98.         //dummy data
  99.                 int airport_code = -1;
  100.                 String description = "dummy";
  101.  
  102.                 if (!columns[0].equals("") && !columns[1].equals("")) {
  103.                     airport_code = Integer.parseInt(columns[0]);
  104.                     description = columns[1];
  105.                 }
  106.                 return new Tuple2<Integer, String>(airport_code, description);
  107.             }
  108.         });
  109.  
  110.  
  111.  
  112.     //get same data as Map and broadcast
  113.         final Map<Integer, String> a_map = a_paired.collectAsMap();
  114.  
  115.         final Broadcast<Map<Integer, String>> airports_broadcasted = sc.broadcast(a_map);
  116.  
  117.  
  118.  
  119.  
  120.  
  121.  
  122.     //load flight data from csv to rdd of strings
  123.         JavaRDD<String> f_no_meta = flights.map(new Function<String, String>() {
  124.             @Override
  125.             public String call(String line) throws Exception {
  126.                 return line.replace("\"", "");
  127.             }
  128.         }).filter(new Function<String, Boolean>() {
  129.             @Override
  130.             public Boolean call(String line2) throws Exception {
  131.                 return !line2.contains("YEAR");
  132.             }
  133.         });
  134.  
  135.  
  136.     //generate corteges of
  137.     //      KEY: cortege of: <origin_airport_id, dest_airport_id>
  138.     //      VALUE: <FlightSerializable> container for data (airports, delay, cancellation_mark) as Serializable object
  139.         JavaPairRDD<Tuple2<Integer, Integer>, FlightSerializable> f_paired = f_no_meta.mapToPair(new PairFunction<String, Tuple2<Integer, Integer>, FlightSerializable>() {
  140.             @Override
  141.             public Tuple2<Tuple2<Integer, Integer>, FlightSerializable> call(String line) throws Exception {
  142.                 String[] columns = line.split(",");
  143.  
  144.                 //dummy data to return
  145.                 int origin_ap = -1;
  146.                 int dest_ap = -1;
  147.                 float cnc = (float)1;
  148.  
  149.                 FlightSerializable flight_data_storage =
  150.                         new FlightSerializable(origin_ap, dest_ap, cnc);
  151.  
  152.  
  153.                 //have origin_ap, dest_ap, cancellation_flag
  154.                 if (!columns[11].equals("") &&
  155.                         !columns[14].equals("") &&
  156.                         !columns[19].equals("")) {
  157.  
  158.                     origin_ap = Integer.parseInt(columns[11]);
  159.                     dest_ap = Integer.parseInt(columns[14]);
  160.  
  161.                     flight_data_storage.origin_airport_id = Integer.parseInt(columns[11]);
  162.                     flight_data_storage.dest_airport_id = Integer.parseInt(columns[14]);
  163.  
  164.  
  165.                     //not cancelled
  166.                     if ((float)0 == Float.parseFloat(columns[19]) &&
  167.                             !columns[18].equals("")) {
  168.  
  169.                         flight_data_storage.cancelled = (float)0;
  170.                         flight_data_storage.arr_delay_new = Float.parseFloat(columns[18]);
  171.  
  172.                     }
  173.                     //cancelled
  174.                     if ((float)1 == Float.parseFloat(columns[19])) {
  175.  
  176.                         flight_data_storage.cancelled = (float)1;
  177.                         //777 = dummy
  178.                         flight_data_storage.arr_delay_new = (float)777;
  179.  
  180.                     }
  181.                 }
  182.  
  183.                 return new Tuple2<Tuple2<Integer, Integer>, FlightSerializable>
  184.                         (new Tuple2<Integer, Integer>(origin_ap, dest_ap),
  185.                                 flight_data_storage);
  186.  
  187.             }
  188.         });
  189.  
  190.     //pairs example:
  191.     //((12478,12892),SparkLab$FlightSerializable@7aec3716)
  192.     //((12478,12892),SparkLab$FlightSerializable@47d88d7b)
  193.     //((12478,12892),SparkLab$FlightSerializable@5a30ab16)
  194.  
  195.  
  196.  
  197.  
  198.         JavaPairRDD<Tuple2<Integer, Integer>, Iterable<FlightSerializable>> f_groupped =
  199.                 f_paired.groupByKey();
  200.  
  201.  
  202.  
  203.  
  204.  
  205.     //simplify iterable of Serialized objects to string in same cortage:
  206.     //      KEY: cortege of: <origin_airport_id, dest_airport_id>
  207.     //      VALUE: String based on data from all serialized objects that were groupped by that KEY
  208.         JavaPairRDD<Tuple2<Integer, Integer>, String> f_mapped = f_groupped.mapToPair(
  209.                 new PairFunction<Tuple2<Tuple2<Integer, Integer>, Iterable<FlightSerializable>>, Tuple2<Integer, Integer>, String>() {
  210.             @Override
  211.             public Tuple2<Tuple2<Integer, Integer>, String> call(Tuple2<Tuple2<Integer, Integer>, Iterable<FlightSerializable>> value) throws Exception {
  212.  
  213.                 Tuple2<Integer, Integer> airport_bundle = value._1;
  214.                 Iterable<FlightSerializable> data_containers_bundle = value._2;
  215.  
  216.                 String ret = "";
  217.                 int flights_quantity = 0;
  218.  
  219.  
  220.                 int num_of_positive_delays = 0;
  221.                 int num_of_cancellations = 0;
  222.                 float max_delay = (float)0;
  223.  
  224.  
  225.                 for (FlightSerializable t : data_containers_bundle) {
  226.  
  227.                     if ((float)0 == t.cancelled) {
  228.  
  229.                         if ((float)0 < t.arr_delay_new) {
  230.                             if (t.arr_delay_new > max_delay) {
  231.                                 max_delay = t.arr_delay_new;
  232.                             }
  233.                             num_of_positive_delays++;
  234.                         }
  235.                     }
  236.  
  237.                     if ((float)1 == t.cancelled) {
  238.                         num_of_cancellations++;
  239.                     }
  240.  
  241.                     flights_quantity++;
  242.                 }
  243.  
  244.                 ret += "Flights Qty:" + flights_quantity + ", Stats:";
  245.  
  246.                 float delay_percentage = (float)num_of_positive_delays / (float)flights_quantity * (float)100;
  247.                 float cancellation_percentage = (float)num_of_cancellations / (float)flights_quantity * (float)100;
  248.  
  249.                 ret += "Cancelled:" + num_of_cancellations + "/" + flights_quantity + "[" + cancellation_percentage + "%], ";
  250.  
  251.                 if ((float)0 == max_delay) {
  252.                     ret += "Delayed: - [0.0%], Max_Delay: - [0.0]";
  253.                 } else {
  254.                     ret += "Delayed:" + num_of_positive_delays + "/" + flights_quantity + "[" + delay_percentage + "%], ";
  255.                     ret += "Max_Delay:" + max_delay;
  256.                 }
  257.  
  258.                 return new Tuple2<Tuple2<Integer, Integer>, String>(airport_bundle, ret);
  259.             }
  260.         });
  261.  
  262.     //example:
  263.     //((10299,10926),Flights Qty:8, Stats:Cancelled:0/8[0.0%], Delayed:2/8[25.0%], Max_Delay:151.0)
  264.     //((14107,13891),Flights Qty:21, Stats:Cancelled:1/21[4.7619047%], Delayed:9/21[42.857143%], Max_Delay:67.0)
  265.     //((12266,10721),Flights Qty:1, Stats:Cancelled:0/1[0.0%], Delayed: - [0.0%], Max_Delay: - [0.0])
  266.  
  267.  
  268.  
  269.     //get airport names from previous broadcasted Map and concatenate all data to same string
  270.         JavaRDD<String> final_flight_dataset = f_mapped.map(new Function<Tuple2<Tuple2<Integer, Integer>, String>, String>() {
  271.             @Override
  272.             public String call(Tuple2<Tuple2<Integer, Integer>, String> value) throws Exception {
  273.  
  274.                 Map<Integer, String> a_names_broadcasted_map = airports_broadcasted.value();
  275.                 Tuple2<Integer, Integer> airport_code_bundle = value._1;
  276.                 String airport_bundle_stats = value._2;
  277.  
  278.                 String origin_ap_name = a_names_broadcasted_map.get(airport_code_bundle._1);
  279.                 String dest_ap_name = a_names_broadcasted_map.get(airport_code_bundle._2);
  280.  
  281.                 return "F: " + origin_ap_name + " ->> " + dest_ap_name + ", \t" + airport_bundle_stats;
  282.  
  283.             }
  284.         });
  285.  
  286.         final_flight_dataset.saveAsTextFile("hdfs://localhost:9000/user/anthony/spar");
  287.  
  288.  
  289.  
  290.         System.out.println("\nSUCCESSFULLY EXECUTED:" +
  291.                 ", in(flights):" + f_paired.count() +
  292.                 ", out(bundles):" + final_flight_dataset.count() + "\n");
  293.     }
  294.  
  295. }
  296.  
  297.  
  298.  
  299.  
  300.  
  301. //Output example:
  302. F: Honolulu, HI: Honolulu International ->> Dallas/Fort Worth, TX: Dallas/Fort Worth International,     Flights Qty:7, Stats:Cancelled:0/7[0.0%], Delayed:5/7[71.42857%], Max_Delay:70.0
  303. F: Midland/Odessa, TX: Midland International ->> Dallas, TX: Dallas Love Field,     Flights Qty:11, Stats:Cancelled:0/11[0.0%], Delayed:4/11[36.363636%], Max_Delay:29.0
  304. F: Orlando, FL: Orlando International ->> Chicago, IL: Chicago O'Hare International,    Flights Qty:41, Stats:Cancelled:0/41[0.0%], Delayed:18/41[43.90244%], Max_Delay:94.0
  305. F: Detroit, MI: Detroit Metro Wayne County ->> Chicago, IL: Chicago Midway International,   Flights Qty:23, Stats:Cancelled:1/23[4.347826%], Delayed:12/23[52.173912%], Max_Delay:78.0
  306. F: Denver, CO: Denver International ->> San Antonio, TX: San Antonio International,     Flights Qty:10, Stats:Cancelled:0/10[0.0%], Delayed:7/10[70.0%], Max_Delay:167.0
  307. F: Charlotte, NC: Charlotte Douglas International ->> Phoenix, AZ: Phoenix Sky Harbor International,    Flights Qty:30, Stats:Cancelled:0/30[0.0%], Delayed:14/30[46.666668%], Max_Delay:220.0
  308. F: Chicago, IL: Chicago O'Hare International ->> Baltimore, MD: Baltimore/Washington International Thurgood Marshall,   Flights Qty:22, Stats:Cancelled:1/22[4.5454545%], Delayed:13/22[59.090908%], Max_Delay:475.0
  309. F: Las Vegas, NV: McCarran International ->> Wichita, KS: Wichita Dwight D Eisenhower National,     Flights Qty:4, Stats:Cancelled:0/4[0.0%], Delayed:1/4[25.0%], Max_Delay:31.0
  310. F: Chicago, IL: Chicago Midway International ->> Los Angeles, CA: Los Angeles International,    Flights Qty:13, Stats:Cancelled:0/13[0.0%], Delayed:6/13[46.153847%], Max_Delay:93.0
  311. F: Miami, FL: Miami International ->> Washington, DC: Ronald Reagan Washington National,    Flights Qty:28, Stats:Cancelled:1/28[3.5714288%], Delayed:12/28[42.857143%], Max_Delay:44.0
  312. F: San Francisco, CA: San Francisco International ->> Kona, HI: Kona International Airport at Keahole,  Flights Qty:8, Stats:Cancelled:0/8[0.0%], Delayed:3/8[37.5%], Max_Delay:28.0
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement