Last active
October 2, 2019 14:17
-
-
Save ottomata/b9c59bc0858832bdf4ed1ebcd7187397 to your computer and use it in GitHub Desktop.
backfill_mediawiki_revision_score.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.functions._ | |
// We need this to convert the out of order new schema to the new hive table schema. | |
// This also is used to drop columns that aren't in the new hive table schema. | |
import org.wikimedia.analytics.refinery.spark.sql.HiveExtensions._ | |
// Get the new desired field schemas | |
val mediawiki_revision_score_2 = spark.table("event.mediawiki_revision_score") | |
val scoreMapFieldSchema = mediawiki_revision_score_2.schema("scores").dataType | |
val errorMapFieldSchema = mediawiki_revision_score_2.schema("errors").dataType | |
// Function to convert scores array to scores map | |
val scoreArrayRowStructToMap = (scoresArray: Seq[Row]) => { | |
// We need to return an Option to avoid NullPointerExceptions if values are null | |
if (scoresArray == null || scoresArray.length == 0) { | |
None | |
} | |
else { | |
// Convert the array of score structs to a array of score with probability maps | |
val scoresWithMapProbability = scoresArray.map(scoreWithStructProbabilty => { | |
val model_name = scoreWithStructProbabilty.getString(0) | |
val model_version = scoreWithStructProbabilty.getString(1) | |
val prediction = scoreWithStructProbabilty.getSeq[String](2) | |
val probabilityMap = scoreWithStructProbabilty.getSeq[Row](3).map(p => p.getString(0) -> p.getDouble(1)).toMap | |
Row(model_name, model_version, prediction, probabilityMap) | |
}) | |
// convert the array of score object with probability maps to | |
// a map of model_name -> score object | |
Some(scoresWithMapProbability.map(r => r.getString(0) -> r).toMap) | |
} | |
} | |
// Make a udf | |
val scoreArrayRowStructToMapUdf = udf(scoreArrayRowStructToMap, scoreMapFieldSchema) | |
// function to convert errors array to errors map | |
val errorArrayRowStructToMap = (errorsArray: Seq[Row]) => { | |
if (errorsArray == null || errorsArray.length == 0) { | |
None | |
} | |
else { | |
Some(errorsArray.map(errorStruct => errorStruct.getString(0) -> errorStruct).toMap) | |
} | |
} | |
val errorArrayRowStructToMapUdf = udf(errorArrayRowStructToMap, errorMapFieldSchema) | |
def convertRevisionScore1to2(revScore1Df: DataFrame) = { | |
// Use the udf to convert and add the new map columns, and then drop the old array ones. | |
revScore1Df | |
.withColumn("scores_map", scoreArrayRowStructToMapUdf(col("scores"))).drop("scores").withColumnRenamed("scores_map", "scores") | |
.withColumn("errors_map", errorArrayRowStructToMapUdf(col("errors"))).drop("errors").withColumnRenamed("errors_map", "errors") | |
// Now all of the fields should be the same, we just need the field order to be the same. | |
// Good thing we have HiveExtensions convertToSchema! | |
// Note: This also drops 2 unused columns: meta.schema_uri and meta.topic | |
.convertToSchema(mediawiki_revision_score_2.schema) | |
} | |
def convertAndWriteRevisionScore1to2(revScore1Df: DataFrame, outputBasePath: String) = { | |
// I can't seem to insert this DataFrame directly into event.mediawiki_revision_score. | |
// I get ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead. | |
// I've tried modifying the UDF functions above to return Options for any map type inside (like probability), | |
// but it doesnt't help. Without the options, I get NullPointerExceptions. | |
// So! We write this to its own NEW hive table, and will switch to Hive directoy | |
// to insert into event.mediawiki_revision_score. | |
convertRevisionScore1to2(revScore1Df) | |
.write | |
.partitionBy("datacenter", "year", "month", "day", "hour") | |
.mode("append") | |
.parquet(outputBasePath) | |
} | |
val months = Seq( | |
("2018", "12"), | |
("2019", "1"), | |
("2019", "2"), | |
("2019", "3"), | |
("2019", "4"), | |
("2019", "5"), | |
("2019", "6"), | |
("2019", "7"), | |
("2019", "8"), | |
("2019", "9") | |
) | |
val mediawiki_revision_score_1 = spark.table("otto.mediawiki_revision_score_1") | |
months.foreach({ case (year, month) => { | |
println(s"------ BEGIN Transforming ${year} ${month} day < 15") | |
convertAndWriteRevisionScore1to2( | |
mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day < 15"), | |
"/user/otto/mediawiki_revision_score_1_backfill/backfill0" | |
) | |
println(s"------ DONE Transforming ${year} ${month} day < 15\n\n\n") | |
println(s"------ BEGIN Transforming ${year} ${month} day >= 15") | |
convertAndWriteRevisionScore1to2( | |
mediawiki_revision_score_1.where(s"year=${year} and month=${month} and day >= 15"), | |
"/user/otto/mediawiki_revision_score_1_backfill/backfill0" | |
) | |
println(s"------ DONE Transforming ${year} ${month} day >= 15\n\n\n") | |
}}) | |
// --- TODO --- | |
// Move data dirs out of /user/otto/mediawiki_revision_score_1_backfill/backfill0 into event/mediawiki_revision_score | |
// MSCK REPAIR TABLE event.mediawiki_revision_score | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment