Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created April 25, 2017 17:56
Show Gist options
  • Select an option

  • Save ottomata/33b2086319c17b234968fc5dc214cc8f to your computer and use it in GitHub Desktop.

Select an option

Save ottomata/33b2086319c17b234968fc5dc214cc8f to your computer and use it in GitHub Desktop.
// // TODO: instead of the following, what if we took the original outputDf, and
// // added null cast as type fields not present in outputDf
// // to the outputDfs schema. Then we shouldn't need to re-read the data with the
// val table = hiveContext.table(tableName)
//
// // THis is kinda working? Maybe we need need
//// val emptyTableDf = table.where("1=0")
//// val finalDf = emptyTableDf.unionAll(outputDf)
//
// // make sure the output df has table's fields
//
// def mergeDataFrameSchema(df: DataFrame, schema: StructType): DataFrame = {
//
// val schemaNormalized = schema.normalize(false)
// // schema is uber schema and has at least all fields in df.
// // we want to iterate on any fields in schema but not in df, and add them.
// val missingFieldsDf = schemaNormalized.diff(df.schema.normalize())
// missingFieldsDf.foldLeft(df) {
// case (currentDf, field) => {
// val currentFieldsNormalized = currentDf.schema.normalize().fieldNames
//
// // If this field is not present in df at all, then we add a null column for it.
// if (!currentFieldsNormalized.contains(field.name)) {
// println(s"Adding column with nulls for ${field.name}")
// currentDf.withColumn(field.name, lit(null).cast(schema(field.name).dataType))
// }
// // else, if this field is a struct, we need to recursively merge the dataframe
// else if (field.isStructType) {
//
// // get the DataFrame for this struct
// val fieldStructDf = currentDf.select(s"${field.name}.*")
// // This is the schema that this struct field SHOULD have.
// val shouldBeSchema = schema(field.name).dataType.asInstanceOf[StructType]
//
// println(s"Merging ${field.name}")
// val mergedFieldDf = mergeDataFrameSchema(fieldStructDf, shouldBeSchema)
//
//
// // merged FieldDf is subfields, need to convert to structfield
// val tmpFieldName = field.name + "__TMP__"
// println(s"Adding column $tmpFieldName to replace ${field.name}")
// currentDf
// .withColumn(tmpFieldName, mergedFieldDf.col("*"))
// .drop(field.name)
// .withColumnRenamed(tmpFieldName, field.name)
// }
// // baaaad
// else {
// throw new IllegalStateException(s"NOOO ${field.name}")
// currentDf
// }
// // reorder columns so that they are in schema's order
// .select(schemaNormalized.fieldNames.head, schemaNormalized.fieldNames.tail: _*)
// }
// }
// }
//// val outputFieldsInTableOrder = table.columns.intersect(outputDf.columns.map(_.toLowerCase)).toSeq
//
// val finalDf = table.schema.diff(outputDf.schema.normalize).foldLeft(outputDf) {
// case (currentDf, field) => {
// if (field.isStructType && currentDf.columns.contains(field.name)) {
// val tmpFieldName = field.name + "__TMP__"
// println(s"Adding column $tmpFieldName to replace ${field.name}")
//
// // order the table's struct fields in the same
//
// // order the struct DF in the same order as table
// val tableStructColumns = table.select(s"$field.*").columns
// val structDf = currentDf.select(s"$field.*").select(tableStructColumns.head, tableStructColumns.tail:_*)
//
// val structSchema = currentDf.select(field.name).schema(field.name).dataType.asInstanceOf[StructType]
// .merge(table.schema(field.name).dataType.asInstanceOf[StructType])
//
//
//
// currentDf.withColumn(
// tmpFieldName,
// currentDf(field.name).cast(structSchema)
// )
//
//
// .drop(field.name).withColumnRenamed(tmpFieldName, field.name)
// }
// else {
// println(s"Adding column with nulls for ${field.name}")
// currentDf.withColumn(field.name, lit(null).cast(table.schema(field.name).dataType))
// }
// }
// }
// // reorder them so that they are in table order
// .select(table.columns.head, table.columns.tail:_*)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment