Created
February 22, 2017 13:54
-
-
Save mvervuurt/34830cb798dd4426a0a687479ece4941 to your computer and use it in GitHub Desktop.
Loading Excel file using Spark and saving its contents into a Hive table.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.types.{StringType, StructField, StructType} | |
import org.apache.spark.sql.{Row, SparkSession} | |
import org.apache.spark.sql.catalyst.encoders.RowEncoder | |
object TrySparkExcel extends App { | |
//Create Spark Session | |
val ss = SparkSession | |
.builder() | |
.master("local[*]") | |
.appName("Spark Application") | |
.enableHiveSupport() | |
.getOrCreate() | |
val excelDf = ss.read | |
.format("org.zuinnote.spark.office.excel") | |
.option("read.locale.bcp47", "de") | |
.load("src/test/resources/TEST.XLS") | |
val totalCount = excelDf.count | |
println("\n ***************Generate Spark Schema******************") | |
//First row always contains Excel header | |
println(excelDf.take(1)) | |
val headerRow : Seq[Any] = excelDf.take(1)(0)(0).asInstanceOf[Seq[Any]] | |
val headerCell : Row = headerRow(0).asInstanceOf[Row] | |
println(headerCell(0)) | |
//Generate Spark Dataframe Schema | |
val headerSchemaFields = headerRow.map(cell => { | |
val rowCell : Row = cell.asInstanceOf[Row] | |
val columnName : String = rowCell(0).asInstanceOf[String] | |
val cleanColumnName = columnName.replaceAll("\\W", "") | |
StructField(cleanColumnName, StringType, nullable = true) | |
}) | |
val headerSchema = StructType(headerSchemaFields) | |
println (headerSchema) | |
println(headerSchema.length) | |
println("\n ***************Lets start transforming******************") | |
implicit val rowEncoder = RowEncoder(headerSchema) | |
val excelRowDf = excelDf.map(row => { | |
val rowSeq : Seq[Row] = row(0).asInstanceOf[Seq[Row]] | |
val values = rowSeq.map(rowCell => rowCell(0)) | |
Row(values: _*) | |
}) | |
excelRowDf.printSchema() | |
excelRowDf.show() | |
excelRowDf.createOrReplaceTempView("spark_excel_tmp") | |
excelRowDf.cache() | |
//Save to Hive Table in parquet format compressed with Snappy | |
ss.table("spark_excel_tmp").write.mode("append").saveAsTable("spark_excel_hive_v1") | |
val resultsHiveDF = ss.sql("SELECT * FROM spark_excel_hive_v1 LIMIT 10") | |
resultsHiveDF.show(10, false) | |
resultsHiveDF.printSchema() | |
//Stop Spark Session | |
excelRowDf.unpersist() | |
ss.stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks! I've been trying to get this to work for about two days and your code really helped.