Advertisement
Guest User

Untitled

a guest
Jan 20th, 2020
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 8.61 KB | None | 0 0
  1. package com.example.bigdata
  2.  
  3. import org.apache.spark.SparkConf
  4.  
  5. import org.apache.spark.sql.expressions.Window
  6. import org.apache.spark.sql.functions._
  7. import org.apache.spark.sql.SparkSession
  8. import java.time.LocalDate
  9. import java.time.format.DateTimeFormatter
  10. import java.sql.Timestamp
  11. import org.apache.spark.sql.functions.udf
  12.  
  13. object fakty_f {
  14.  
  15.   def main(args: Array[String]) {
  16.     val path = args(0)
  17.  
  18.     val conf: SparkConf = new SparkConf().setAppName("fakty_f")
  19.  
  20.     val spark: SparkSession = SparkSession.builder().
  21.       config(conf).
  22.       enableHiveSupport().
  23.       getOrCreate()
  24.     import spark.implicits._
  25.  
  26.     // DYSTANS
  27.     val mainDataCentral_DS = spark.read.format("org.apache.spark.csv").
  28.       option("header", true).option("inferSchema", true).
  29.       csv(path + "/mainDataCentral.csv").
  30.       cache();
  31.     val mainDataEastern_DS = spark.read.format("org.apache.spark.csv").
  32.       option("header", true).option("inferSchema", true).
  33.       csv(path + "/mainDataEastern.csv").
  34.       cache();
  35.     val mainDataMountain_DS = spark.read.format("org.apache.spark.csv").
  36.       option("header", true).option("inferSchema", true).
  37.       csv(path + "/mainDataMountain.csv").
  38.       cache();
  39.     val mainDataPacific_DS = spark.read.format("org.apache.spark.csv").
  40.       option("header", true).option("inferSchema", true).
  41.       csv(path + "/mainDataPacific.csv").
  42.       cache();
  43.     val allMainData_DS = mainDataCentral_DS.
  44.       union(mainDataEastern_DS).
  45.       union(mainDataMountain_DS).
  46.       union(mainDataPacific_DS).
  47.       dropDuplicates(Array("ID"));
  48.  
  49.     val uniqeDistances = allMainData_DS.dropDuplicates(Array("Distance(mi)")).select("Distance(mi)");
  50.  
  51.     def getId = (dystans: Float) => {
  52.       if (dystans == 0) 0
  53.       else if (dystans < 1) 1
  54.       else if (dystans < 10) 2
  55.       else 3
  56.     }
  57.  
  58.     val getIdUDF = udf(getId)
  59.  
  60.     val kategorie_DS = Seq((0, "0"), (1, "Ponizej 1 mili"), (2, "Ponizej 10 mil"), (3, "Powyzej 10 mil")).toDF("id_k", "kategoria")
  61.  
  62.     val distances_DS_temp = uniqeDistances.
  63.       withColumnRenamed("Distance(mi)", "dystans").
  64.       withColumn("id", getIdUDF($"dystans")).
  65.       select("id","dystans")
  66.  
  67.     val distances_DS = distances_DS_temp.
  68.       join(kategorie_DS, distances_DS_temp("id").equalTo(kategorie_DS("id_k")), "leftouter").
  69.       select("id","dystans", "kategoria")
  70.  
  71.  
  72.     ////////////////////////////POGODA///////////////////////////
  73.     val weather_DS = spark.read.format("org.apache.spark.csv").
  74.       option("header", false).option("inferSchema", true).
  75.       csv(path + "/weather.txt").cache();
  76.  
  77.     val regexWeather = """(?<=Weather Condition: ).+""".r
  78.     val regexTimestamp="""[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9]""".r
  79.     val regexCode = """(?<=airport )[A-Z0-9]{4}""".r
  80.  
  81.     val GetWeather: String => String = regexWeather.findFirstIn(_).get
  82.     val GetDate: String => String = regexTimestamp.findFirstIn(_).get
  83.     val GetCode: String => String = regexCode.findFirstIn(_).get
  84.  
  85.     val getWeatherUDF = udf(GetWeather)
  86.     val getDateUDF = udf(GetDate)
  87.     val getCodeUDF = udf(GetCode)
  88.  
  89.  
  90.     val weather_data = weather_DS.withColumnRenamed("_c8", "weather").
  91.       withColumnRenamed("_c0", "dateCol").
  92.       withColumn("date", getDateUDF($"dateCol").cast("timestamp")).
  93.       withColumn("Code", getCodeUDF($"dateCol")).
  94.       withColumn("weather", getWeatherUDF($"weather")).
  95.       select("weather", "date", "Code")
  96.  
  97.     val weatherTab_DS = weather_DS.withColumnRenamed("_c8", "weather").
  98.       withColumn("weather", getWeatherUDF($"weather")).
  99.       select("weather").dropDuplicates("weather")
  100.  
  101.     val w = Window.orderBy("weather")
  102.     val weather_with_id = weatherTab_DS.withColumn("id", row_number().over(w))
  103.     val weather_wymiar = weather_with_id.withColumnRenamed("weather", "nazwa").select("nazwa", "id")
  104.  
  105.     //////////////////////////GEO/////////////////////////////
  106.  
  107.     val geoDataCentral_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv(path + "/geoDataCentral.csv").cache
  108.     val geoDataEastern_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv(path + "/geoDataEastern.csv").cache
  109.     val geoDataMountain_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv(path + "/geoDataMountain.csv").cache
  110.     val geoDataPacific_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv(path + "/geoDataPacific.csv").cache
  111.     val allGeoData_DS = geoDataCentral_DS
  112.      .union(geoDataEastern_DS).union(geoDataMountain_DS).union(geoDataPacific_DS)
  113.  
  114.     ////////////////////////FACTS DATA//////////////////////////
  115.  
  116.  
  117.     val main_Central_DS = spark.read.format("org.apache.spark.csv").
  118.       option("header", true).
  119.       option("inferSchema", true).
  120.       option("multiline", value= true).
  121.       csv(path + "/mainDataCentral.csv").cache().select(
  122.       substring($"Start_Time", 0, 10).as("Start_Date"),
  123.       $"Start_Time",
  124.       $"End_Time",
  125.       $"Severity",
  126.       $"Distance(mi)",
  127.       $"Zipcode",
  128.       $"Airport_Code"
  129.     )
  130.  
  131.     main_Central_DS.show()
  132.  
  133.     val main_Eastern_DS =  spark.read.format("org.apache.spark.csv").
  134.       option("header", true).
  135.       option("inferSchema", true).
  136.       option("multiline", value= true).
  137.       csv(path + "/mainDataEastern.csv").cache().select(
  138.       substring($"Start_Time", 0, 10).as("Start_Date"),
  139.       $"Start_Time",
  140.       $"End_Time",
  141.       $"Severity",
  142.       $"Distance(mi)",
  143.       $"Zipcode",
  144.       $"Airport_Code"
  145.     )
  146.  
  147.     val main_Mountain_DS =  spark.read.format("org.apache.spark.csv").
  148.       option("header", true).
  149.       option("inferSchema", true).
  150.       option("multiline", value= true).
  151.       csv(path + "/mainDataMountain.csv").cache().select(
  152.       substring($"Start_Time", 0, 10).as("Start_Date"),
  153.       $"Start_Time",
  154.       $"End_Time",
  155.       $"Severity",
  156.       $"Distance(mi)",
  157.       $"Zipcode",
  158.       $"Airport_Code"
  159.     )
  160.  
  161.     val main_Pacific_DS = spark.read.format("org.apache.spark.csv").
  162.       option("header", true).
  163.       option("inferSchema", true).
  164.       option("multiline", value= true).
  165.       csv(path + "/mainDataPacific.csv").cache().select(
  166.       substring($"Start_Time", 0, 10).as("Start_Date"),
  167.       $"Start_Time",
  168.       $"End_Time",
  169.       $"Severity",
  170.       $"Distance(mi)",
  171.       $"Zipcode",
  172.       $"Airport_Code"
  173.     )
  174.  
  175.     val main_All_DS=main_Central_DS
  176.      .union(main_Eastern_DS).union(main_Mountain_DS).union(main_Pacific_DS)
  177.  
  178.  
  179.     ////////////////////////////////TEMPORARY_TABLES/////////////////////////
  180.  
  181.     val pom_dist_DS = distances_DS.withColumnRenamed("id", "id_dist").select("id_dist", "dystans", "kategoria");
  182.  
  183.     val pom_geo_DS = allGeoData_DS.withColumnRenamed("Zipcode", "geoZipcode").select("geoZipcode", "State")
  184.  
  185.     val pom_weather_wymiar = weather_wymiar.withColumnRenamed("id", "weather_id").withColumnRenamed("nazwa", "nazwa_pogody").select("weather_id", "nazwa_pogody")
  186.  
  187.     val pom_weather_DS = weather_data.
  188.       withColumnRenamed("Code", "w_code").
  189.       withColumnRenamed("date", "w_date").
  190.       join(pom_weather_wymiar, $"weather" === $"nazwa_pogody").
  191.       select("w_code", "w_date", "weather", "weather_id");
  192.  
  193.     /////////////////////////CREATE FACTS//////////////////////
  194.  
  195.     val all_Facts = main_All_DS.
  196.       join(pom_dist_DS, $"Distance(mi)" === $"dystans").
  197.       join(pom_geo_DS, $"Zipcode" === $"geoZipcode").
  198.       join(pom_weather_DS, unix_timestamp($"w_date")>=unix_timestamp($"Start_Time")-3600 && unix_timestamp($"w_date")<=unix_timestamp($"End_Time")+3600 && $"w_code" === $"Airport_Code", "LeftOuter").
  199.       dropDuplicates("Start_Time").
  200.       select(
  201.         $"Start_Date".as("date"),
  202.         $"State".as("state"),
  203.         $"Start_Time",
  204.         $"End_Time",
  205.         $"Severity".as("severity"),
  206.         $"Distance(mi)".as("Distance"),
  207.         $"id_dist".as("distance_id"),
  208.         $"weather_id").
  209.       withColumn("Length_in_sec", (unix_timestamp($"End_Time") - unix_timestamp($"Start_Time"))).
  210.       groupBy($"date",
  211.         $"state",
  212.         $"severity",
  213.         $"distance_id"
  214.         ,$"weather_id"
  215.       ).
  216.       agg(count($"severity").as("accidents_count"),
  217.         sum("Distance").as("distance_sum"),
  218.         sum("Length_in_sec").as("duration_sum")
  219.       ).drop("Start_Time",
  220.       "geoZipcode",
  221.       "Zipcode",
  222.       "End_Time",
  223.       "Distance",
  224.       "Length_in_sec",
  225.       "Airport_Code"
  226.     )
  227.  
  228.     all_Facts.show()
  229.     spark.sql("use etl_hd")
  230.     all_Facts.write.insertInto("fakty_f")
  231.  
  232.   }
  233.  
  234. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement