Created
March 6, 2017 20:51
-
-
Save mvervuurt/2216ba678adcfb755c7c6fc721f463a3 to your computer and use it in GitHub Desktop.
Spark Excel Loading Utils to Transform the DataFrame into DateFrame * that can be saved regular rows and columns in Hive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.catalyst.encoders.RowEncoder | |
import org.apache.spark.sql.types.{StringType, StructField, StructType} | |
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} | |
/** | |
* Spark Excel Loading Utils to Transform the DataFrame into DateFrame | |
* that can be saved regular rows and columns in Hive | |
*/ | |
object SparkExcelLoadingUtils { | |
/** | |
* Load Excel Data File into Spark Dataset or Dataframe | |
* It assumes the Header is available and infers the Spark Schema from it | |
* @param sparkSession SparkSession | |
* @param excelPath Full path of the Excel file to load | |
* @return Loaded Excel file as a Spark Dataset or Dataframe | |
*/ | |
def loadExcelData(sparkSession: SparkSession, excelPath: String) : DataFrame = { | |
val excelDf = sparkSession.read | |
.format("org.zuinnote.spark.office.excel") | |
.option("read.locale.bcp47", "de") | |
.load(excelPath) | |
excelDf | |
} | |
/** | |
* Automatically create Spark schema from Excel header row | |
* @param excelDf Spark Excel Dataset or Dataframe | |
* @tparam T type like for example Spark SQL Row | |
* @return Spark SQL Schema | |
*/ | |
def createExcelSparkSchema[T](excelDf : Dataset[T]) : StructType = { | |
//First row always contains Excel header | |
val firstRow = excelDf.take(1).asInstanceOf[Array[Row]] | |
val headerRow = firstRow(0)(0).asInstanceOf[Seq[Any]] | |
//Generate Spark Dataframe Schema | |
val headerSchemaFields = headerRow.map(cell => { | |
val rowCell = cell.asInstanceOf[Row] | |
val columnName = rowCell(0).asInstanceOf[String] | |
val cleanColumnName = cleanString(columnName) | |
StructField(cleanColumnName, StringType, nullable = true) | |
}) | |
StructType(headerSchemaFields) | |
} | |
/** | |
* Flatten rows of Spark Dataset created by loading an Excel file | |
* @param sparkExcelSchema Spark SQL Schema | |
* @param excelDf Spark Dataset created by loading an Excel file | |
* @return flattened and tabular Dataset<Row> which can be written to Hive | |
*/ | |
def flattenExcelRows(sparkExcelSchema: StructType, excelDf: Dataset[Row]) = { | |
//Encoder used at runtime to create Spark Dataset<Row> | |
implicit val rowEncoder = RowEncoder(sparkExcelSchema) | |
//Transform to simple tabular format supported by Spotfire | |
val excelRowDf = excelDf.map(row => { | |
val rowSeq = row(0).asInstanceOf[Seq[Row]] | |
val values = rowSeq.map(rowCell => rowCell(0)) | |
Row(values: _*) | |
}) | |
excelRowDf | |
} | |
/** | |
* Clean String by removing punctuation characters | |
* and replacing white space characters with an _ | |
* @param str input string | |
* @return clean string | |
*/ | |
def cleanString(str: String): String ={ | |
str.replaceAll("\\p{Punct}","").replaceAll("\\s","_") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment