Skip to content

Instantly share code, notes, and snippets.

@rajvermacas
Last active November 13, 2024 07:42
Show Gist options
  • Save rajvermacas/3f0b897c151854560db2865b64217efb to your computer and use it in GitHub Desktop.
Save rajvermacas/3f0b897c151854560db2865b64217efb to your computer and use it in GitHub Desktop.
Scala spark json file
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object ScalaListEquivalents {
def main(args: Array[String]): Unit = {
// Initialize Spark session
val spark = SparkSession.builder
.appName("CSV Loader")
.config("spark.master", "local")
.config("spark.driver.host", "localhost")
.getOrCreate()
import spark.implicits._
// Sample DataFrame with JSON strings
val jsonData = Seq(
"""{"process_details": [{"field1": "value1", "field2": 2}, {"field1": "value2", "field2": 3}]}""",
"""{"process_details": [{"field1": "value3", "field2": 4}, {"field1": "value4", "field2": 5}]}"""
).toDF("process_details")
// Define the schema for the JSON array within process_details
val processDetailsSchema = ArrayType(StructType(Array(
StructField("field1", StringType, true),
StructField("field2", IntegerType, true)
)))
// Parse the JSON column from string to structured format
val parsedJsonDF = jsonData
.withColumn("process_details", from_json(col("process_details"), StructType(Seq(
StructField("process_details", processDetailsSchema, true)
))))
.select(col("process_details.*")) // Flatten the top-level structure
// Show the parsed DataFrame
parsedJsonDF.show(false)
parsedJsonDF.printSchema()
// Define the schema, same as parsedJsonDF's schema
val schema = parsedJsonDF.schema
// Define a new row to append
val newRow = Row(Seq(
Row("newValue1", 6), // First item in the array
Row("newValue2", 7) // Second item in the array
))
// Create a DataFrame with the new row
val newDF = spark.createDataFrame(spark.sparkContext.parallelize(Seq(newRow)), schema)
// Append the new row to parsedJsonDF
val updatedDF = parsedJsonDF.union(newDF)
// Show the updated DataFrame
updatedDF.show(false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment