Advertisement
Guest User

Untitled

a guest
Nov 18th, 2017
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.41 KB | None | 0 0
  1. package lab4;
  2.  
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaPairRDD;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.broadcast.Broadcast;
  8. import scala.Tuple2;
  9.  
  10. import java.util.Map;
  11.  
  12.  
  13. public class InitializeSpark {
  14.  
  15.     private static String getPersent(int nFlights, int nCancel, int nDelay, double maxTimeDelay){
  16.  
  17.         int percent = (int) (((double) (nCancel + nDelay)/(double) nFlights)*100);
  18.  
  19.         return "maxDelay: " + maxTimeDelay + ", " + "%(cancel + delay): " + percent + "%";
  20.     }
  21.  
  22.     public static void main(String[] args){
  23.  
  24.         if (args.length != 3) {
  25.             System.err.println("Usage: InitializeSpark <input airports path> <input flights path> <output path>");
  26.             System.exit(-1);
  27.         }
  28.  
  29.         SparkConf conf = new SparkConf().setAppName("lab4");
  30.         JavaSparkContext sc = new JavaSparkContext(conf);
  31.  
  32.         JavaRDD<String> airports = sc.textFile(args[0]);
  33.         airports = airports.map(s -> s.replace("\"", "")).filter(s -> !s.contains("Code"));
  34.  
  35.         JavaPairRDD<Integer, String> pairedAirports = airports.mapToPair(s -> new ParseAirport(s).getTuple2());
  36.  
  37.         final Broadcast<Map<Integer, String>> airportsBroadcasted = sc.broadcast(pairedAirports.collectAsMap());
  38.  
  39.  
  40.         JavaRDD<String> flights = sc.textFile(args[1]);
  41.         flights = flights.map(s -> s.replace("\"", "")).filter(s -> !s.contains("YEAR"));
  42.  
  43.         JavaRDD<ParseFlight> tableFlight = flights.map(s -> (new ParseFlight(s, airportsBroadcasted.value())));
  44.  
  45.         JavaPairRDD<Tuple2<String, String>,ParseFlight> pairedFlights = tableFlight.mapToPair(f -> new Tuple2<>(f.getPairAirport(),f));
  46.  
  47.         JavaPairRDD<Tuple2<String, String>, ParseFlight> collectedFlight = pairedFlights.reduceByKey(ParseFlight::count);
  48.  
  49.         JavaPairRDD<Tuple2<String, String>, String> result = collectedFlight.
  50.                 mapToPair(s ->  new Tuple2<>(s._1, getPersent(s._2.getnFlights(), s._2.getnCancelled(), s._2.getnDelay(), s._2.getTimeDelay())));
  51.  
  52.  
  53.         result.saveAsTextFile(args[2]);
  54.  
  55.         System.out.println("\nSTATUS: COMPLETE SUCCESSFUL");
  56.     }
  57. }
  58.  
  59.  
  60. package lab4;
  61.  
  62.  
  63. import scala.Tuple2;
  64.  
  65. import java.io.Serializable;
  66. import java.util.Map;
  67.  
  68. public class ParseFlight implements Serializable{
  69.  
  70.     final int ORIGIN_AIRPORT = 11;
  71.     final int DEST_AIRPORT = 14;
  72.     final int ARR_NEW_DELAY = 18;
  73.     final int CANCELLED = 19;
  74.  
  75.     private int nFlights;
  76.     private int nCancelled;
  77.     private int nDelay;
  78.     private double timeDelay;
  79.     private Tuple2<String, String> pairAirport;
  80.  
  81.     public ParseFlight(String string, Map<Integer, String> mapAirportName){
  82.  
  83.         String[] columns = string.split(",");
  84.  
  85.         String idTo = mapAirportName.get(Integer.parseInt(columns[ORIGIN_AIRPORT]));
  86.         String  idFrom = mapAirportName.get(Integer.parseInt(columns[DEST_AIRPORT]));
  87.         this.pairAirport = new Tuple2<>(idFrom,idTo);
  88.         String cancel = columns[CANCELLED];
  89.         String delays = columns[ARR_NEW_DELAY];
  90.         this.nFlights = 1;
  91.  
  92.         if (!cancel.equals("")){
  93.             double cancelled = Double.parseDouble(cancel);
  94.             if (cancelled == 1){
  95.                 this.nCancelled = 1;
  96.             } else if (!delays.equals("")){
  97.                 double delay = Double.parseDouble(delays);
  98.                 if (delay > 0){
  99.                     this.nDelay = 1;
  100.                     this.timeDelay = delay;
  101.                 }
  102.             }
  103.         }
  104.     }
  105.  
  106.     public ParseFlight(Tuple2 key, int nFlights, double timeDelay, int nCancelled, int nDelay){
  107.  
  108.         this.pairAirport = key;
  109.         this.nFlights = nFlights;
  110.         this.timeDelay = timeDelay;
  111.         this.nCancelled = nCancelled;
  112.         this.nDelay = nDelay;
  113.  
  114.     }
  115.  
  116.  
  117.     public static ParseFlight count(ParseFlight first, ParseFlight second){
  118.         return new ParseFlight(first.getPairAirport(),
  119.                 first.getnFlights() + second.getnFlights(),
  120.                 first.getTimeDelay() > second.getTimeDelay() ? first.getTimeDelay(): second.getTimeDelay(),
  121.                 first.getnCancelled() + second.getnCancelled(),
  122.                 first.getnDelay() + second.getnDelay());
  123.     }
  124.  
  125.     public Tuple2<String, String> getPairAirport() {
  126.         return pairAirport;
  127.     }
  128.  
  129.     public double getTimeDelay() {
  130.         return timeDelay;
  131.     }
  132.  
  133.     public int getnFlights() {
  134.         return nFlights;
  135.     }
  136.  
  137.     public int getnCancelled() {
  138.         return nCancelled;
  139.     }
  140.  
  141.     public int getnDelay() {
  142.         return nDelay;
  143.     }
  144. }
  145.  
  146.  
  147. package lab4;
  148.  
  149.  
  150. import scala.Tuple2;
  151.  
  152. import java.io.Serializable;
  153.  
  154. public class ParseAirport implements Serializable{
  155.  
  156.     final int CODE = 0;
  157.     final int DESCRIPTION = 1;
  158.  
  159.     private int airportCode;
  160.     private String airportDescription;
  161.     private Tuple2<Integer, String> tuple2;
  162.  
  163.     public ParseAirport(String string) {
  164.  
  165.         String[] columns = string.split(",");
  166.  
  167.         String codeStr = columns[CODE];
  168.         String description = columns[DESCRIPTION];
  169.  
  170.         if (!codeStr.equals("") && !description.equals("")) {
  171.             airportCode = Integer.parseInt(codeStr);
  172.             airportDescription = description;
  173.             tuple2 = new Tuple2<>(airportCode, airportDescription);
  174.         }
  175.     }
  176.  
  177.     public Tuple2<Integer, String> getTuple2() {
  178.         return tuple2;
  179.     }
  180. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement