Last active
December 19, 2022 09:36
-
-
Save tecmaverick/06b849f599cac7932b7c81bd54951be8 to your computer and use it in GitHub Desktop.
Spark DataFrame Scratchpad
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.spark.sql.Row | |
| // Generate a test DataFrame with 2 rows | |
| val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("id","colors") | |
| df.show | |
| // Show ONLY the colors | |
| df.map{ case Row(id:Int,colors:String) => colors}.show | |
| // Create a row with both fields | |
| df.map{ case Row(id:Int,colors:String) => (id,colors)}.show | |
| // Create a new case class with three fields | |
| case class newRecord(id: Int, colors: String, Status: String) | |
| //Map the data from two fields to three fields of type NewRecord | |
| df.map{ case Row(id:Int,colors:String) => newRecord(id,colors,"NA")}.show | |
| //Access the first field (ID) | |
| df.map(x=> x.getInt(0)).show | |
| //Access the first field (ID), and second field Colors | |
| df.map(x=> (x.getAs[Int](0),x.getAs[String](1))).show | |
| //Access the fields by fieldname | |
| df.map(x=> (x.getAs[Int]("id"),x.getAs[String]("colors"))).show | |
| // Create a new Row based on Index position of fields | |
| df.map(x=> (x.getInt(0),x.getString(1))).show | |
| //Convert both fields to string of type DataSet | |
| val df1 = df.as[(String,String)] | |
| df1.printSchema | |
| df1.getClass | |
| // Map the values to Row type and do any calculations | |
| df.map { | |
| case Row(id: Int, colors: String) => (id, colors) | |
| }.show | |
| // Do custom logic within MAP and return a new resultset | |
| df.map { | |
| case Row(id: Int, colors: String) => (if(id==1) id*2 else id, colors) | |
| }.show | |
| case class Record(ID: Int, Colors: String) | |
| // If ID is 1, then changes Color for that row to "Black White" and keep the rest of the fields as is | |
| df.as[Record].map { | |
| case myrow if myrow.ID == 1 => myrow.copy(Colors = "Black White") | |
| case rec => rec | |
| }.show | |
| // Split and explode the colors field | |
| df.selectExpr("id","explode(split(colors,' ')) as colors").show | |
| // flatMap colors data | |
| df.flatMap(x=>x.getString(1).split(" ")).show | |
| // ============================================================ | |
| // Generate a test DataSet with 2 rows | |
| val ds = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors") | |
| case class Record(ID: Int, Colors: String) | |
| ds.as[Record].map(x=>(x.ID,x.Colors)).show | |
| // ============================================================ | |
| // DIsplay all the lines in the DF | |
| val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors") | |
| df.foreach(x=>println(x)) | |
| // ============================================================ | |
| // Generate a test data with 2 rows | |
| case class TestData(id: Int, desc: String) | |
| val dataList = List( | |
| TestData(1, "abc"), | |
| TestData(2, "xyz") | |
| ) | |
| val df = spark.createDataset(dataList) | |
| // ============================================================ | |
| // Read a CSV file with header | |
| val data = spark.read.option("header","true") | |
| .option("inferSchema",true) | |
| .csv("file:///Users/abe/Personal/Apache Spark/Scripts/colors.csv") | |
| // ============================================================ | |
| // Get Type of a Variable | |
| val df = Seq((1,"abc"),(2,"xyz")).toDF("id","name") | |
| df.getClass | |
| // Get Type of a Variable in SCALA REPL | |
| :type df | |
| //============================================================== | |
| // Create DataFrame with schema | |
| import org.apache.spark.sql._ | |
| import org.apache.spark.sql.types._ | |
| import scala.collection.JavaConverters._ | |
| val schema = StructType( | |
| StructField("a1", StringType) :: | |
| StructField("a2", IntegerType) :: | |
| StructField("a3", IntegerType) :: | |
| StructField("a4", IntegerType) :: | |
| StructField("a5", IntegerType) :: Nil) | |
| val data = List( | |
| Row("001", 1, 1, 1, 1), | |
| Row("001", 2, 2, 2, 2), | |
| Row("001", null, 1, 2, 2), | |
| Row("002", 1, 2, 3, 4), | |
| Row("002", 1, 4, null, 3), | |
| Row("002", 4, null, 1, 5), | |
| Row("003", 5, 1, 0, 2) | |
| ) | |
| val df = spark.createDataFrame(data.asJava, schema) | |
| //============================================================== | |
| // Using encoders | |
| // https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset | |
| // https://towardsdatascience.com/apache-spark-dataset-encoders-demystified-4a3026900d63 | |
| import org.apache.spark.sql.catalyst.encoders.RowEncoder | |
| import org.apache.spark.sql.types._ | |
| import org.apache.spark.sql.Row | |
| val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors") | |
| val schema = StructType(Seq( | |
| StructField("ID", IntegerType), | |
| StructField("Colors", StringType) | |
| )) | |
| val encoder = RowEncoder(schema) | |
| case class Record(id: Int, colors: String) | |
| df.map { | |
| case Row(id:Int, colors: String) if id == 1 => Row(id*10, colors) | |
| case row => row | |
| }(encoder).show | |
| data = df.map(row => { | |
| val idVal = row.getAs[Int](0) | |
| val newIdVal = if (idVal == 1) 1*10 else 1*100 | |
| val colors = row.getAs[String](1) | |
| Row(newIdVal,colors) | |
| })(encoder).show | |
| //============================================================== | |
| //View the fields of DataFrame | |
| val df = Seq((1,"abc"),(2,"xyz")).toDF("id","name") | |
| df1.schema.fields |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment