Skip to content

Instantly share code, notes, and snippets.

@mvervuurt
Created February 22, 2017 13:54
Show Gist options
  • Save mvervuurt/34830cb798dd4426a0a687479ece4941 to your computer and use it in GitHub Desktop.
Save mvervuurt/34830cb798dd4426a0a687479ece4941 to your computer and use it in GitHub Desktop.
Loading Excel file using Spark and saving its contents into a Hive table.
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
object TrySparkExcel extends App {
//Create Spark Session
val ss = SparkSession
.builder()
.master("local[*]")
.appName("Spark Application")
.enableHiveSupport()
.getOrCreate()
val excelDf = ss.read
.format("org.zuinnote.spark.office.excel")
.option("read.locale.bcp47", "de")
.load("src/test/resources/TEST.XLS")
val totalCount = excelDf.count
println("\n ***************Generate Spark Schema******************")
//First row always contains Excel header
println(excelDf.take(1))
val headerRow : Seq[Any] = excelDf.take(1)(0)(0).asInstanceOf[Seq[Any]]
val headerCell : Row = headerRow(0).asInstanceOf[Row]
println(headerCell(0))
//Generate Spark Dataframe Schema
val headerSchemaFields = headerRow.map(cell => {
val rowCell : Row = cell.asInstanceOf[Row]
val columnName : String = rowCell(0).asInstanceOf[String]
val cleanColumnName = columnName.replaceAll("\\W", "")
StructField(cleanColumnName, StringType, nullable = true)
})
val headerSchema = StructType(headerSchemaFields)
println (headerSchema)
println(headerSchema.length)
println("\n ***************Lets start transforming******************")
implicit val rowEncoder = RowEncoder(headerSchema)
val excelRowDf = excelDf.map(row => {
val rowSeq : Seq[Row] = row(0).asInstanceOf[Seq[Row]]
val values = rowSeq.map(rowCell => rowCell(0))
Row(values: _*)
})
excelRowDf.printSchema()
excelRowDf.show()
excelRowDf.createOrReplaceTempView("spark_excel_tmp")
excelRowDf.cache()
//Save to Hive Table in parquet format compressed with Snappy
ss.table("spark_excel_tmp").write.mode("append").saveAsTable("spark_excel_hive_v1")
val resultsHiveDF = ss.sql("SELECT * FROM spark_excel_hive_v1 LIMIT 10")
resultsHiveDF.show(10, false)
resultsHiveDF.printSchema()
//Stop Spark Session
excelRowDf.unpersist()
ss.stop()
}
@awwsmm
Copy link

awwsmm commented Nov 19, 2018

Thanks! I've been trying to get this to work for about two days and your code really helped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment