Skip to content

Instantly share code, notes, and snippets.

@mvervuurt
Created March 6, 2017 20:51
Show Gist options
  • Save mvervuurt/2216ba678adcfb755c7c6fc721f463a3 to your computer and use it in GitHub Desktop.
Save mvervuurt/2216ba678adcfb755c7c6fc721f463a3 to your computer and use it in GitHub Desktop.
Spark Excel Loading Utils to Transform the DataFrame into DateFrame * that can be saved regular rows and columns in Hive
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Spark Excel Loading Utils to Transform the DataFrame into DateFrame
* that can be saved regular rows and columns in Hive
*/
object SparkExcelLoadingUtils {
/**
* Load Excel Data File into Spark Dataset or Dataframe
* It assumes the Header is available and infers the Spark Schema from it
* @param sparkSession SparkSession
* @param excelPath Full path of the Excel file to load
* @return Loaded Excel file as a Spark Dataset or Dataframe
*/
def loadExcelData(sparkSession: SparkSession, excelPath: String) : DataFrame = {
val excelDf = sparkSession.read
.format("org.zuinnote.spark.office.excel")
.option("read.locale.bcp47", "de")
.load(excelPath)
excelDf
}
/**
* Automatically create Spark schema from Excel header row
* @param excelDf Spark Excel Dataset or Dataframe
* @tparam T type like for example Spark SQL Row
* @return Spark SQL Schema
*/
def createExcelSparkSchema[T](excelDf : Dataset[T]) : StructType = {
//First row always contains Excel header
val firstRow = excelDf.take(1).asInstanceOf[Array[Row]]
val headerRow = firstRow(0)(0).asInstanceOf[Seq[Any]]
//Generate Spark Dataframe Schema
val headerSchemaFields = headerRow.map(cell => {
val rowCell = cell.asInstanceOf[Row]
val columnName = rowCell(0).asInstanceOf[String]
val cleanColumnName = cleanString(columnName)
StructField(cleanColumnName, StringType, nullable = true)
})
StructType(headerSchemaFields)
}
/**
* Flatten rows of Spark Dataset created by loading an Excel file
* @param sparkExcelSchema Spark SQL Schema
* @param excelDf Spark Dataset created by loading an Excel file
* @return flattened and tabular Dataset<Row> which can be written to Hive
*/
def flattenExcelRows(sparkExcelSchema: StructType, excelDf: Dataset[Row]) = {
//Encoder used at runtime to create Spark Dataset<Row>
implicit val rowEncoder = RowEncoder(sparkExcelSchema)
//Transform to simple tabular format supported by Spotfire
val excelRowDf = excelDf.map(row => {
val rowSeq = row(0).asInstanceOf[Seq[Row]]
val values = rowSeq.map(rowCell => rowCell(0))
Row(values: _*)
})
excelRowDf
}
/**
* Clean String by removing punctuation characters
* and replacing white space characters with an _
* @param str input string
* @return clean string
*/
def cleanString(str: String): String ={
str.replaceAll("\\p{Punct}","").replaceAll("\\s","_")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment