Created
April 25, 2017 17:56
-
-
Save ottomata/33b2086319c17b234968fc5dc214cc8f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // // TODO: instead of the following, what if we took the original outputDf, and | |
| // // added null cast as type fields not present in outputDf | |
| // // to the outputDfs schema. Then we shouldn't need to re-read the data with the | |
| // val table = hiveContext.table(tableName) | |
| // | |
| // // THis is kinda working? Maybe we need need | |
| //// val emptyTableDf = table.where("1=0") | |
| //// val finalDf = emptyTableDf.unionAll(outputDf) | |
| // | |
| // // make sure the output df has table's fields | |
| // | |
| // def mergeDataFrameSchema(df: DataFrame, schema: StructType): DataFrame = { | |
| // | |
| // val schemaNormalized = schema.normalize(false) | |
| // // schema is uber schema and has at least all fields in df. | |
| // // we want to iterate on any fields in schema but not in df, and add them. | |
| // val missingFieldsDf = schemaNormalized.diff(df.schema.normalize()) | |
| // missingFieldsDf.foldLeft(df) { | |
| // case (currentDf, field) => { | |
| // val currentFieldsNormalized = currentDf.schema.normalize().fieldNames | |
| // | |
| // // If this field is not present in df at all, then we add a null column for it. | |
| // if (!currentFieldsNormalized.contains(field.name)) { | |
| // println(s"Adding column with nulls for ${field.name}") | |
| // currentDf.withColumn(field.name, lit(null).cast(schema(field.name).dataType)) | |
| // } | |
| // // else, if this field is a struct, we need to recursively merge the dataframe | |
| // else if (field.isStructType) { | |
| // | |
| // // get the DataFrame for this struct | |
| // val fieldStructDf = currentDf.select(s"${field.name}.*") | |
| // // This is the schema that this struct field SHOULD have. | |
| // val shouldBeSchema = schema(field.name).dataType.asInstanceOf[StructType] | |
| // | |
| // println(s"Merging ${field.name}") | |
| // val mergedFieldDf = mergeDataFrameSchema(fieldStructDf, shouldBeSchema) | |
| // | |
| // | |
| // // merged FieldDf is subfields, need to convert to structfield | |
| // val tmpFieldName = field.name + "__TMP__" | |
| // println(s"Adding column $tmpFieldName to replace ${field.name}") | |
| // currentDf | |
| // .withColumn(tmpFieldName, mergedFieldDf.col("*")) | |
| // .drop(field.name) | |
| // .withColumnRenamed(tmpFieldName, field.name) | |
| // } | |
| // // baaaad | |
| // else { | |
| // throw new IllegalStateException(s"NOOO ${field.name}") | |
| // currentDf | |
| // } | |
| // // reorder columns so that they are in schema's order | |
| // .select(schemaNormalized.fieldNames.head, schemaNormalized.fieldNames.tail: _*) | |
| // } | |
| // } | |
| // } | |
| //// val outputFieldsInTableOrder = table.columns.intersect(outputDf.columns.map(_.toLowerCase)).toSeq | |
| // | |
| // val finalDf = table.schema.diff(outputDf.schema.normalize).foldLeft(outputDf) { | |
| // case (currentDf, field) => { | |
| // if (field.isStructType && currentDf.columns.contains(field.name)) { | |
| // val tmpFieldName = field.name + "__TMP__" | |
| // println(s"Adding column $tmpFieldName to replace ${field.name}") | |
| // | |
| // // order the table's struct fields in the same | |
| // | |
| // // order the struct DF in the same order as table | |
| // val tableStructColumns = table.select(s"$field.*").columns | |
| // val structDf = currentDf.select(s"$field.*").select(tableStructColumns.head, tableStructColumns.tail:_*) | |
| // | |
| // val structSchema = currentDf.select(field.name).schema(field.name).dataType.asInstanceOf[StructType] | |
| // .merge(table.schema(field.name).dataType.asInstanceOf[StructType]) | |
| // | |
| // | |
| // | |
| // currentDf.withColumn( | |
| // tmpFieldName, | |
| // currentDf(field.name).cast(structSchema) | |
| // ) | |
| // | |
| // | |
| // .drop(field.name).withColumnRenamed(tmpFieldName, field.name) | |
| // } | |
| // else { | |
| // println(s"Adding column with nulls for ${field.name}") | |
| // currentDf.withColumn(field.name, lit(null).cast(table.schema(field.name).dataType)) | |
| // } | |
| // } | |
| // } | |
| // // reorder them so that they are in table order | |
| // .select(table.columns.head, table.columns.tail:_*) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment