Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package lab4;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.broadcast.Broadcast;
- import scala.Tuple2;
- import java.util.Map;
- public class InitializeSpark {
- private static String getPersent(int nFlights, int nCancel, int nDelay, double maxTimeDelay){
- int percent = (int) (((double) (nCancel + nDelay)/(double) nFlights)*100);
- return "maxDelay: " + maxTimeDelay + ", " + "%(cancel + delay): " + percent + "%";
- }
- public static void main(String[] args){
- if (args.length != 3) {
- System.err.println("Usage: InitializeSpark <input airports path> <input flights path> <output path>");
- System.exit(-1);
- }
- SparkConf conf = new SparkConf().setAppName("lab4");
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaRDD<String> airports = sc.textFile(args[0]);
- airports = airports.map(s -> s.replace("\"", "")).filter(s -> !s.contains("Code"));
- JavaPairRDD<Integer, String> pairedAirports = airports.mapToPair(s -> new ParseAirport(s).getTuple2());
- final Broadcast<Map<Integer, String>> airportsBroadcasted = sc.broadcast(pairedAirports.collectAsMap());
- JavaRDD<String> flights = sc.textFile(args[1]);
- flights = flights.map(s -> s.replace("\"", "")).filter(s -> !s.contains("YEAR"));
- JavaRDD<ParseFlight> tableFlight = flights.map(s -> (new ParseFlight(s, airportsBroadcasted.value())));
- JavaPairRDD<Tuple2<String, String>,ParseFlight> pairedFlights = tableFlight.mapToPair(f -> new Tuple2<>(f.getPairAirport(),f));
- JavaPairRDD<Tuple2<String, String>, ParseFlight> collectedFlight = pairedFlights.reduceByKey(ParseFlight::count);
- JavaPairRDD<Tuple2<String, String>, String> result = collectedFlight.
- mapToPair(s -> new Tuple2<>(s._1, getPersent(s._2.getnFlights(), s._2.getnCancelled(), s._2.getnDelay(), s._2.getTimeDelay())));
- result.saveAsTextFile(args[2]);
- System.out.println("\nSTATUS: COMPLETE SUCCESSFUL");
- }
- }
- package lab4;
- import scala.Tuple2;
- import java.io.Serializable;
- import java.util.Map;
- public class ParseFlight implements Serializable{
- final int ORIGIN_AIRPORT = 11;
- final int DEST_AIRPORT = 14;
- final int ARR_NEW_DELAY = 18;
- final int CANCELLED = 19;
- private int nFlights;
- private int nCancelled;
- private int nDelay;
- private double timeDelay;
- private Tuple2<String, String> pairAirport;
- public ParseFlight(String string, Map<Integer, String> mapAirportName){
- String[] columns = string.split(",");
- String idTo = mapAirportName.get(Integer.parseInt(columns[ORIGIN_AIRPORT]));
- String idFrom = mapAirportName.get(Integer.parseInt(columns[DEST_AIRPORT]));
- this.pairAirport = new Tuple2<>(idFrom,idTo);
- String cancel = columns[CANCELLED];
- String delays = columns[ARR_NEW_DELAY];
- this.nFlights = 1;
- if (!cancel.equals("")){
- double cancelled = Double.parseDouble(cancel);
- if (cancelled == 1){
- this.nCancelled = 1;
- } else if (!delays.equals("")){
- double delay = Double.parseDouble(delays);
- if (delay > 0){
- this.nDelay = 1;
- this.timeDelay = delay;
- }
- }
- }
- }
- public ParseFlight(Tuple2 key, int nFlights, double timeDelay, int nCancelled, int nDelay){
- this.pairAirport = key;
- this.nFlights = nFlights;
- this.timeDelay = timeDelay;
- this.nCancelled = nCancelled;
- this.nDelay = nDelay;
- }
- public static ParseFlight count(ParseFlight first, ParseFlight second){
- return new ParseFlight(first.getPairAirport(),
- first.getnFlights() + second.getnFlights(),
- first.getTimeDelay() > second.getTimeDelay() ? first.getTimeDelay(): second.getTimeDelay(),
- first.getnCancelled() + second.getnCancelled(),
- first.getnDelay() + second.getnDelay());
- }
- public Tuple2<String, String> getPairAirport() {
- return pairAirport;
- }
- public double getTimeDelay() {
- return timeDelay;
- }
- public int getnFlights() {
- return nFlights;
- }
- public int getnCancelled() {
- return nCancelled;
- }
- public int getnDelay() {
- return nDelay;
- }
- }
- package lab4;
- import scala.Tuple2;
- import java.io.Serializable;
- public class ParseAirport implements Serializable{
- final int CODE = 0;
- final int DESCRIPTION = 1;
- private int airportCode;
- private String airportDescription;
- private Tuple2<Integer, String> tuple2;
- public ParseAirport(String string) {
- String[] columns = string.split(",");
- String codeStr = columns[CODE];
- String description = columns[DESCRIPTION];
- if (!codeStr.equals("") && !description.equals("")) {
- airportCode = Integer.parseInt(codeStr);
- airportDescription = description;
- tuple2 = new Tuple2<>(airportCode, airportDescription);
- }
- }
- public Tuple2<Integer, String> getTuple2() {
- return tuple2;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement