Created
February 13, 2019 06:49
-
-
Save saisgit/b856ad11507976a6b79e6395f5de3cdc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.sql.types.{StructType,ArrayType} | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.Column | |
import org.apache.spark.sql.functions.col | |
import org.apache.spark.sql.functions.explode_outer | |
object FlattenJson extends App { | |
val conf = new SparkConf().setMaster("local[*]").setAppName("JSON Flattener") | |
val sc = new SparkContext(conf) | |
val sqlContext = new SQLContext(sc) | |
import sqlContext.implicits._ | |
val inputJson = """|{ | |
| "name":"John", | |
| "age":30, | |
| "bike":{ | |
| "name":"Bajaj", "models":["Dominor", "Pulsar"] | |
| }, | |
| "cars": [ | |
| { "name":"Ford", "models":[ "Fiesta", "Focus", "Mustang" ] }, | |
| { "name":"BMW", "models":[ "320", "X3", "X5" ] }, | |
| { "name":"Fiat", "models":[ "500", "Panda" ] } | |
| ] | |
|}""".stripMargin('|') | |
println(inputJson) | |
//creating rdd for the json | |
val jsonRDD = sc.parallelize(inputJson::Nil) | |
//creating DF for the json | |
val jsonDF = sqlContext.read.json(jsonRDD) | |
//Schema of the JSON DataFrame before Flattening | |
jsonDF.schema | |
//Output DataFrame Before Flattening | |
jsonDF.show(false) | |
//Function for exploding Array and StructType column | |
def flattenDataframe(df: DataFrame): DataFrame = { | |
val fields = df.schema.fields | |
val fieldNames = fields.map(x => x.name) | |
val length = fields.length | |
for(i <- 0 to fields.length-1){ | |
val field = fields(i) | |
val fieldtype = field.dataType | |
val fieldName = field.name | |
fieldtype match { | |
case arrayType: ArrayType => | |
val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName) | |
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName") | |
// val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*")) | |
val explodedDf = df.selectExpr(fieldNamesAndExplode:_*) | |
return flattenDataframe(explodedDf) | |
case structType: StructType => | |
val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname) | |
val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames | |
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_")))) | |
val explodedf = df.select(renamedcols:_*) | |
return flattenDataframe(explodedf) | |
case _ => | |
} | |
} | |
df | |
} | |
val flattendedJSON = flattenDataframe(jsonDF) | |
//schema of the JSON after Flattening | |
flattendedJSON.schema | |
//Output DataFrame After Flattening | |
flattendedJSON.show(false) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@SaiKemp Can I get help, I would need Java code for above. Please help me.