Skip to content

Instantly share code, notes, and snippets.

@huangjs
Created December 13, 2014 01:37
Show Gist options
  • Save huangjs/683a4c85ae14e9ae205b to your computer and use it in GitHub Desktop.
Save huangjs/683a4c85ae14e9ae205b to your computer and use it in GitHub Desktop.
createParquetTable
def createParquetTable(name: String, file: String)(implicit sqlc: HiveContext): Unit = {
import org.apache.spark.sql.hive.HiveMetastoreTypes
import sqlc._
val rdd = parquetFile(file)
val schema = rdd.schema.fields.map(f => s"`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n")
val cleanup = s"""
|DROP TABLE IF EXISTS $name
""".stripMargin
// HIVE 0.12
val ddl_12 = s"""
|CREATE EXTERNAL TABLE $name (
| $schema
|)
|ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
|STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
|OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
|LOCATION '$file'
""".stripMargin
// HIVE 0.13
val ddl_13 = s"""
|CREATE EXTERNAL TABLE $name (
| $schema
|)
|STORED AS PARQUET
|LOCATION '$file'
""".stripMargin
val alter = rdd.schema.fields.filter(f => f.name.contains("::")).map { f =>
s"ALTER TABLE $name CHANGE `${f.name}` ${f.name.replaceAll(".*?::", "")} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}"
}
// val analyze = s"ANALYZE TABLE $name COMPUTE STATISTICS noscan"
// FIXME: Hive 0.13.1 failed to get statistics from Parquet files with noscan
val analyze = s"ANALYZE TABLE $name COMPUTE STATISTICS"
if (Try(table(name)).isSuccess) {
sql(cleanup)
}
//HACK: latest Spark failed doing broadcast join optimization, will remove when the optimization is back
Try(sql(ddl_13)).orElse(Try(sql(ddl_12)))
//FIXME: won't work
// //NOTE: fix schema, removing Pig identifiers (e.g. sorted::...)
// val t = table(name)
// val newSchema = StructType(t.schema.fields.map(s => s.copy(name = s.name.replaceAll(".*?::", ""))))
// //HACK: will throws an exception, removing it also works...
// sql(s"drop table if exists $name")
// applySchema(t, newSchema).saveAsTable(name)
//FIXME: won't work, values become NULL. https://issues.apache.org/jira/browse/SPARK-4781
// if (alter.length > 0) {
// // FIXME: change to multi statement when SparkSQL supports it (currently cannot contain semi-column in the statement)
// alter.foreach(sql)
// }
sql(analyze)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment