Advertisement
Guest User

Untitled

a guest
Jan 20th, 2020
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.75 KB | None | 0 0
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.expressions.Window
  3. import org.apache.spark.sql.functions.udf
  4. import org.apache.spark.sql.functions._
  5. import spark.implicits._
  6. import java.time.LocalDate
  7. import java.time.format.DateTimeFormatter
  8. import java.sql.Timestamp
  9.  
  10.  
  11.  
  12.  
  13.  
  14.  
  15.  
  16.  
  17. // DYSTANS
  18. val mainDataCentral_DS = spark.read.format("org.apache.spark.csv").
  19. option("header", true).option("inferSchema", true).
  20. csv("/user/gedin8/labs/spark/mainDataCentral.csv").
  21. cache();
  22. val mainDataEastern_DS = spark.read.format("org.apache.spark.csv").
  23. option("header", true).option("inferSchema", true).
  24. csv("/user/gedin8/labs/spark/mainDataEastern.csv").
  25. cache();
  26. val mainDataMountain_DS = spark.read.format("org.apache.spark.csv").
  27. option("header", true).option("inferSchema", true).
  28. csv("/user/gedin8/labs/spark/mainDataMountain.csv").
  29. cache();
  30. val mainDataPacific_DS = spark.read.format("org.apache.spark.csv").
  31. option("header", true).option("inferSchema", true).
  32. csv("/user/gedin8/labs/spark/mainDataPacific.csv").
  33. cache();
  34. val allMainData_DS = mainDataCentral_DS.
  35. union(mainDataEastern_DS).
  36. union(mainDataMountain_DS).
  37. union(mainDataPacific_DS).
  38. dropDuplicates(Array("ID"));
  39.  
  40. val uniqeDistances = allMainData_DS.dropDuplicates(Array("Distance(mi)")).select("Distance(mi)");
  41.  
  42. def getId = (dystans: Float) => {
  43. if (dystans == 0) 0
  44. else if (dystans < 1) 1
  45. else if (dystans < 10) 2
  46. else 3
  47. }
  48.  
  49. val getIdUDF = udf(getId)
  50.  
  51. val kategorie_DS = Seq((0, "0"), (1, "Ponizej 1 mili"), (2, "Ponizej 10 mil"), (3, "Powyzej 10 mil")).toDF("id_k", "kategoria")
  52.  
  53. val distances_DS_temp = uniqeDistances.
  54. withColumnRenamed("Distance(mi)", "dystans").
  55. withColumn("id", getIdUDF($"dystans")).
  56. select("id","dystans")
  57.  
  58. val distances_DS = distances_DS_temp.
  59. join(kategorie_DS, distances_DS_temp("id").equalTo(kategorie_DS("id_k")), "leftouter").
  60. select("id","dystans", "kategoria")
  61.  
  62.  
  63. ////////////////////////////POGODA///////////////////////////
  64. val weather_DS = spark.read.format("org.apache.spark.csv").
  65. option("header", false).option("inferSchema", true).
  66. csv("/user/gedin8/labs/spark/weather.txt").cache();
  67.  
  68. val regexWeather = """(?<=Weather Condition: ).+""".r
  69. val regexTimestamp="""[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9]""".r
  70. val regexCode = """(?<=airport )[A-Z0-9]{4}""".r
  71.  
  72. val GetWeather: String => String = regexWeather.findFirstIn(_).get
  73. val GetDate: String => String = regexTimestamp.findFirstIn(_).get
  74. val GetCode: String => String = regexCode.findFirstIn(_).get
  75.  
  76. val getWeatherUDF = udf(GetWeather)
  77. val getDateUDF = udf(GetDate)
  78. val getCodeUDF = udf(GetCode)
  79.  
  80.  
  81. val weather_data = weather_DS.withColumnRenamed("_c8", "weather").
  82. withColumnRenamed("_c0", "dateCol").
  83. withColumn("date", getDateUDF($"dateCol").cast("timestamp")).
  84. withColumn("Code", getCodeUDF($"dateCol")).
  85. withColumn("weather", getWeatherUDF($"weather")).
  86. select("weather", "date", "Code")
  87.  
  88. val weatherTab_DS = weather_DS.withColumnRenamed("_c8", "weather").
  89. withColumn("weather", getWeatherUDF($"weather")).
  90. select("weather").dropDuplicates("weather")
  91.  
  92. val w = Window.orderBy("weather")
  93. val weather_with_id = weatherTab_DS.withColumn("id", row_number().over(w))
  94. val weather_wymiar = weather_with_id.withColumnRenamed("weather", "nazwa").select("nazwa", "id")
  95.  
  96. //////////////////////////GEO/////////////////////////////
  97.  
  98. val geoDataCentral_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv("/user/gedin8/labs/spark/geoDataCentral.csv").cache
  99. val geoDataEastern_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv("/user/gedin8/labs/spark/geoDataEastern.csv").cache
  100. val geoDataMountain_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv("/user/gedin8/labs/spark/geoDataMountain.csv").cache
  101. val geoDataPacific_DS = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).csv("/user/gedin8/labs/spark/geoDataPacific.csv").cache
  102. val allGeoData_DS = geoDataCentral_DS
  103. // .union(geoDataEastern_DS).union(geoDataMountain_DS).union(geoDataPacific_DS)
  104.  
  105. ////////////////////////FACTS DATA//////////////////////////
  106.  
  107.  
  108. val main_Central_DS = spark.read.format("org.apache.spark.csv").
  109. option("header", true).
  110. option("inferSchema", true).
  111. option("multiline", value= true).
  112. csv("/user/gedin8/labs/spark/mainDataCentral.csv").cache().select(
  113. substring($"Start_Time", 0, 10).as("Start_Date"),
  114. $"Start_Time",
  115. $"End_Time",
  116. $"Severity",
  117. $"Distance(mi)",
  118. $"Zipcode",
  119. $"Airport_Code"
  120. )
  121.  
  122. main_Central_DS.show()
  123.  
  124. val main_Eastern_DS = spark.read.format("org.apache.spark.csv").
  125. option("header", true).
  126. option("inferSchema", true).
  127. option("multiline", value= true).
  128. csv("/user/gedin8/labs/spark/mainDataEastern.csv").cache().select(
  129. substring($"Start_Time", 0, 10).as("Start_Date"),
  130. $"Start_Time",
  131. $"End_Time",
  132. $"Severity",
  133. $"Distance(mi)",
  134. $"Zipcode",
  135. $"Airport_Code"
  136. )
  137.  
  138. val main_Mountain_DS = spark.read.format("org.apache.spark.csv").
  139. option("header", true).
  140. option("inferSchema", true).
  141. option("multiline", value= true).
  142. csv("/user/gedin8/labs/spark/mainDataMountain.csv").cache().select(
  143. substring($"Start_Time", 0, 10).as("Start_Date"),
  144. $"Start_Time",
  145. $"End_Time",
  146. $"Severity",
  147. $"Distance(mi)",
  148. $"Zipcode",
  149. $"Airport_Code"
  150. )
  151.  
  152. val main_Pacific_DS = spark.read.format("org.apache.spark.csv").
  153. option("header", true).
  154. option("inferSchema", true).
  155. option("multiline", value= true).
  156. csv("/user/gedin8/labs/spark/mainDataPacific.csv").cache().select(
  157. substring($"Start_Time", 0, 10).as("Start_Date"),
  158. $"Start_Time",
  159. $"End_Time",
  160. $"Severity",
  161. $"Distance(mi)",
  162. $"Zipcode",
  163. $"Airport_Code"
  164. )
  165.  
  166. val main_All_DS=main_Central_DS
  167. // .union(main_Eastern_DS).union(main_Mountain_DS).union(main_Pacific_DS)
  168.  
  169.  
  170. ////////////////////////////////TEMPORARY_TABLES/////////////////////////
  171.  
  172. val pom_dist_DS = distances_DS.withColumnRenamed("id", "id_dist").select("id_dist", "dystans", "kategoria");
  173.  
  174. val pom_geo_DS = allGeoData_DS.withColumnRenamed("Zipcode", "geoZipcode").select("geoZipcode", "State")
  175.  
  176. val pom_weather_wymiar = weather_wymiar.withColumnRenamed("id", "id_weather").withColumnRenamed("nazwa", "nazwa_pogody").select("id_weather", "nazwa_pogody")
  177.  
  178. val pom_weather_DS = weather_data.
  179. withColumnRenamed("Code", "w_code").
  180. withColumnRenamed("date", "w_date").
  181. join(pom_weather_wymiar, $"weather" === $"nazwa_pogody").
  182. select("w_code", "w_date", "weather", "id_weather");
  183.  
  184. /////////////////////////CREATE FACTS//////////////////////
  185.  
  186. val all_Facts = main_All_DS.
  187. join(pom_dist_DS, $"Distance(mi)" === $"dystans").
  188. join(pom_geo_DS, $"Zipcode" === $"geoZipcode").
  189. join(pom_weather_DS, unix_timestamp($"w_date")>=unix_timestamp($"Start_Time") && unix_timestamp($"w_date")<=unix_timestamp($"End_Time") && $"w_code" === $"Airport_Code", "LeftOuter").
  190. dropDuplicates("Start_Time").
  191. select(
  192. $"Start_Date".as("Date"),
  193. $"Start_Time",
  194. $"End_Time",
  195. $"Severity",
  196. $"Distance(mi)".as("Distance"),
  197. $"id_dist".as("Distance_Category"),
  198. $"id_weather",
  199. $"State").
  200. withColumn("Length_in_sec", (unix_timestamp($"End_Time") - unix_timestamp($"Start_Time"))).
  201. groupBy($"Date",
  202. $"State",
  203. $"Severity",
  204. $"Distance_Category"
  205. ,$"id_weather"
  206. ).
  207. agg(count($"Severity").as("Accidents_Count"),
  208. sum("Distance").as("sum_Distance"),
  209. sum("Length_in_sec").as("sum_Duration_Time")
  210. ).drop("Start_Time",
  211. "geoZipcode",
  212. "Zipcode",
  213. "End_Time",
  214. "Distance",
  215. "Length_in_sec",
  216. "Airport_Code"
  217. )
  218.  
  219. all_Facts.show()
  220. //all_Facts.write.insertInto("etl_hd.fact")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement