Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql._
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.functions._
- // We need this to convert the out of order new schema to the new hive table schema.
- // This also is used to drop columns that aren't in the new hive table schema.
- import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._
- // Get the new desired field schemas
- val mediawiki_revision_score_2 = spark.table("event.mediawiki_revision_score")
- val scoreMapFieldSchema = mediawiki_revision_score_2.schema("scores").dataType
- val errorMapFieldSchema = mediawiki_revision_score_2.schema("errors").dataType
- // Function to convert scores array to scores map
- val scoreArrayRowStructToMap = (scoresArray: Seq[Row]) => {
- // We need to return an Option to avoid NullPointerExceptions if values are null
- if (scoresArray == null || scoresArray.length == 0) {
- None
- }
- else {
- // Convert the array of score structs to a array of score with probability maps
- val scoresWithMapProbability = scoresArray.map(scoreWithStructProbabilty => {
- val model_name = scoreWithStructProbabilty.getString(0)
- val model_version = scoreWithStructProbabilty.getString(1)
- val prediction = scoreWithStructProbabilty.getSeq[String](2)
- val probabilityMap = scoreWithStructProbabilty.getSeq[Row](3).map(p => p.getString(0) -> p.getDouble(1)).toMap
- Row(model_name, model_version, prediction, probabilityMap)
- })
- // convert the array of score object with probability maps to
- // a map of model_name -> score object
- Some(scoresWithMapProbability.map(r => r.getString(0) -> r).toMap)
- }
- }
- // Make a udf
- val scoreArrayRowStructToMapUdf = udf(scoreArrayRowStructToMap, scoreMapFieldSchema)
- // function to convert errors array to errors map
- val errorArrayRowStructToMap = (errorsArray: Seq[Row]) => {
- if (errorsArray == null || errorsArray.length == 0) {
- None
- }
- else {
- Some(errorsArray.map(errorStruct => errorStruct.getString(0) -> errorStruct).toMap)
- }
- }
- val errorArrayRowStructToMapUdf = udf(errorArrayRowStructToMap, errorMapFieldSchema)
- def convertRevisionScore1to2(revScore1Df: DataFrame) = {
- // Use the udf to convert and add the new map columns, and then drop the old array ones.
- revScore1Df
- .withColumn("scores_map", scoreArrayRowStructToMapUdf(col("scores"))).drop("scores").withColumnRenamed("scores_map", "scores")
- .withColumn("errors_map", errorArrayRowStructToMapUdf(col("errors"))).drop("errors").withColumnRenamed("errors_map", "errors")
- // Now all of the fields should be the same, we just need the field order to be the same.
- // Good thing we have HiveExtensions convertToSchema!
- // Note: This also drops 2 unused columns: meta.schema_uri and meta.topic
- .convertToSchema(mediawiki_revision_score_2.schema)
- }
- def convertAndWriteRevisionScore1to2(revScore1Df: DataFrame, outputBasePath: String) = {
- // I can't seem to insert this DataFrame directly into event.mediawiki_revision_score.
- // I get ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead.
- // I've tried modifying the UDF functions above to return Options for any map type inside (like probability),
- // but it doesnt't help. Without the options, I get NullPointerExceptions.
- // So! We write this to its own NEW hive table, and will switch to Hive directoy
- // to insert into event.mediawiki_revision_score.
- convertRevisionScore1to2(revScore1Df)
- .write
- .partitionBy("datacenter", "year", "month", "day", "hour")
- .mode("append")
- .parquet(outputBasePath)
- }
- val months = Seq(
- ("2018", "12"),
- ("2019", "1"),
- ("2019", "2"),
- ("2019", "3"),
- ("2019", "4"),
- ("2019", "5"),
- ("2019", "6"),
- ("2019", "7"),
- ("2019", "8"),
- ("2019", "9")
- )
- val mediawiki_revision_score_1 = spark.table("otto.mediawiki_revision_score_1")
- months.foreach({ case (year, month) => {
- println(s"------ BEGIN Transforming ${year} ${month} day < 15")
- convertAndWriteRevisionScore1to2(
- mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day < 15"),
- "/user/otto/mediawiki_revision_score_1_backfill/backfill0"
- )
- println(s"------ DONE Transforming ${year} ${month} day < 15\n\n\n")
- println(s"------ BEGIN Transforming ${year} ${month} day >= 15")
- convertAndWriteRevisionScore1to2(
- mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day >= 15"),
- "/user/otto/mediawiki_revision_score_1_backfill/backfill0"
- )
- println(s"------ DONE Transforming ${year} ${month} day >= 15\n\n\n")
- }})
- // --- TODO ---
- // Move data dirs out of /user/otto/mediawiki_revision_score_1_backfill/backfill0 into event/mediawiki_revision_score
- // MSCK REPAIR TABLE event.mediawiki_revision_score
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement