Advertisement
Guest User

Untitled

a guest
Oct 20th, 2019
112
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.79 KB | None | 0 0
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.types._
  3. import org.apache.spark.sql.functions._
  4.  
  5. // We need this to convert the out of order new schema to the new hive table schema.
  6. // This also is used to drop columns that aren't in the new hive table schema.
  7. import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
  8.  
  9. // Get the new desired field schemas
  10. val mediawiki_revision_score_2 = spark.table("event.mediawiki_revision_score")
  11. val scoreMapFieldSchema = mediawiki_revision_score_2.schema("scores").dataType
  12. val errorMapFieldSchema = mediawiki_revision_score_2.schema("errors").dataType
  13.  
  14.  
  15. // Function to convert scores array to scores map
  16. val scoreArrayRowStructToMap = (scoresArray: Seq[Row]) => {
  17. // We need to return an Option to avoid NullPointerExceptions if values are null
  18. if (scoresArray == null || scoresArray.length == 0) {
  19. None
  20. }
  21. else {
  22. // Convert the array of score structs to a array of score with probability maps
  23. val scoresWithMapProbability = scoresArray.map(scoreWithStructProbabilty => {
  24. val model_name = scoreWithStructProbabilty.getString(0)
  25. val model_version = scoreWithStructProbabilty.getString(1)
  26. val prediction = scoreWithStructProbabilty.getSeq[String](2)
  27. val probabilityMap = scoreWithStructProbabilty.getSeq[Row](3).map(p => p.getString(0) -> p.getDouble(1)).toMap
  28. Row(model_name, model_version, prediction, probabilityMap)
  29. })
  30. // convert the array of score object with probability maps to
  31. // a map of model_name -> score object
  32. Some(scoresWithMapProbability.map(r => r.getString(0) -> r).toMap)
  33. }
  34. }
  35. // Make a udf
  36. val scoreArrayRowStructToMapUdf = udf(scoreArrayRowStructToMap, scoreMapFieldSchema)
  37.  
  38.  
  39. // function to convert errors array to errors map
  40. val errorArrayRowStructToMap = (errorsArray: Seq[Row]) => {
  41. if (errorsArray == null || errorsArray.length == 0) {
  42. None
  43. }
  44. else {
  45. Some(errorsArray.map(errorStruct => errorStruct.getString(0) -> errorStruct).toMap)
  46. }
  47. }
  48. val errorArrayRowStructToMapUdf = udf(errorArrayRowStructToMap, errorMapFieldSchema)
  49.  
  50.  
  51.  
  52. def convertRevisionScore1to2(revScore1Df: DataFrame) = {
  53. // Use the udf to convert and add the new map columns, and then drop the old array ones.
  54. revScore1Df
  55. .withColumn("scores_map", scoreArrayRowStructToMapUdf(col("scores"))).drop("scores").withColumnRenamed("scores_map", "scores")
  56. .withColumn("errors_map", errorArrayRowStructToMapUdf(col("errors"))).drop("errors").withColumnRenamed("errors_map", "errors")
  57. // Now all of the fields should be the same, we just need the field order to be the same.
  58. // Good thing we have HiveExtensions convertToSchema!
  59. // Note: This also drops 2 unused columns: meta.schema_uri and meta.topic
  60. .convertToSchema(mediawiki_revision_score_2.schema)
  61. }
  62.  
  63. def convertAndWriteRevisionScore1to2(revScore1Df: DataFrame, outputBasePath: String) = {
  64. // I can't seem to insert this DataFrame directly into event.mediawiki_revision_score.
  65. // I get ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead.
  66. // I've tried modifying the UDF functions above to return Options for any map type inside (like probability),
  67. // but it doesnt't help. Without the options, I get NullPointerExceptions.
  68. // So! We write this to its own NEW hive table, and will switch to Hive directoy
  69. // to insert into event.mediawiki_revision_score.
  70. convertRevisionScore1to2(revScore1Df)
  71. .write
  72. .partitionBy("datacenter", "year", "month", "day", "hour")
  73. .mode("append")
  74. .parquet(outputBasePath)
  75. }
  76.  
  77.  
  78.  
  79. val months = Seq(
  80. ("2018", "12"),
  81. ("2019", "1"),
  82. ("2019", "2"),
  83. ("2019", "3"),
  84. ("2019", "4"),
  85. ("2019", "5"),
  86. ("2019", "6"),
  87. ("2019", "7"),
  88. ("2019", "8"),
  89. ("2019", "9")
  90. )
  91.  
  92.  
  93. val mediawiki_revision_score_1 = spark.table("otto.mediawiki_revision_score_1")
  94.  
  95. months.foreach({ case (year, month) => {
  96. println(s"------ BEGIN Transforming ${year} ${month} day < 15")
  97. convertAndWriteRevisionScore1to2(
  98. mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day < 15"),
  99. "/user/otto/mediawiki_revision_score_1_backfill/backfill0"
  100. )
  101. println(s"------ DONE Transforming ${year} ${month} day < 15\n\n\n")
  102.  
  103. println(s"------ BEGIN Transforming ${year} ${month} day >= 15")
  104. convertAndWriteRevisionScore1to2(
  105. mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day >= 15"),
  106. "/user/otto/mediawiki_revision_score_1_backfill/backfill0"
  107. )
  108. println(s"------ DONE Transforming ${year} ${month} day >= 15\n\n\n")
  109. }})
  110.  
  111. // --- TODO ---
  112. // Move data dirs out of /user/otto/mediawiki_revision_score_1_backfill/backfill0 into event/mediawiki_revision_score
  113. // MSCK REPAIR TABLE event.mediawiki_revision_score
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement