Created
December 13, 2014 01:37
-
-
Save huangjs/683a4c85ae14e9ae205b to your computer and use it in GitHub Desktop.
createParquetTable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def createParquetTable(name: String, file: String)(implicit sqlc: HiveContext): Unit = { | |
import org.apache.spark.sql.hive.HiveMetastoreTypes | |
import sqlc._ | |
val rdd = parquetFile(file) | |
val schema = rdd.schema.fields.map(f => s"`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}").mkString(",\n") | |
val cleanup = s""" | |
|DROP TABLE IF EXISTS $name | |
""".stripMargin | |
// HIVE 0.12 | |
val ddl_12 = s""" | |
|CREATE EXTERNAL TABLE $name ( | |
| $schema | |
|) | |
|ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' | |
|STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' | |
|OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' | |
|LOCATION '$file' | |
""".stripMargin | |
// HIVE 0.13 | |
val ddl_13 = s""" | |
|CREATE EXTERNAL TABLE $name ( | |
| $schema | |
|) | |
|STORED AS PARQUET | |
|LOCATION '$file' | |
""".stripMargin | |
val alter = rdd.schema.fields.filter(f => f.name.contains("::")).map { f => | |
s"ALTER TABLE $name CHANGE `${f.name}` ${f.name.replaceAll(".*?::", "")} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}" | |
} | |
// val analyze = s"ANALYZE TABLE $name COMPUTE STATISTICS noscan" | |
// FIXME: Hive 0.13.1 failed to get statistics from Parquet files with noscan | |
val analyze = s"ANALYZE TABLE $name COMPUTE STATISTICS" | |
if (Try(table(name)).isSuccess) { | |
sql(cleanup) | |
} | |
//HACK: latest Spark failed doing broadcast join optimization, will remove when the optimization is back | |
Try(sql(ddl_13)).orElse(Try(sql(ddl_12))) | |
//FIXME: won't work | |
// //NOTE: fix schema, removing Pig identifiers (e.g. sorted::...) | |
// val t = table(name) | |
// val newSchema = StructType(t.schema.fields.map(s => s.copy(name = s.name.replaceAll(".*?::", "")))) | |
// //HACK: will throws an exception, removing it also works... | |
// sql(s"drop table if exists $name") | |
// applySchema(t, newSchema).saveAsTable(name) | |
//FIXME: won't work, values become NULL. https://issues.apache.org/jira/browse/SPARK-4781 | |
// if (alter.length > 0) { | |
// // FIXME: change to multi statement when SparkSQL supports it (currently cannot contain semi-column in the statement) | |
// alter.foreach(sql) | |
// } | |
sql(analyze) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment