Skip to content

Instantly share code, notes, and snippets.

@tecmaverick
Last active December 19, 2022 09:36
Show Gist options
  • Save tecmaverick/06b849f599cac7932b7c81bd54951be8 to your computer and use it in GitHub Desktop.
Save tecmaverick/06b849f599cac7932b7c81bd54951be8 to your computer and use it in GitHub Desktop.
Spark DataFrame Scratchpad
import org.apache.spark.sql.Row
// Generate a test DataFrame with 2 rows
val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("id","colors")
df.show
// Show ONLY the colors
df.map{ case Row(id:Int,colors:String) => colors}.show
// Create a row with both fields
df.map{ case Row(id:Int,colors:String) => (id,colors)}.show
// Create a new case class with three fields
case class newRecord(id: Int, colors: String, Status: String)
//Map the data from two fields to three fields of type NewRecord
df.map{ case Row(id:Int,colors:String) => newRecord(id,colors,"NA")}.show
//Access the first field (ID)
df.map(x=> x.getInt(0)).show
//Access the first field (ID), and second field Colors
df.map(x=> (x.getAs[Int](0),x.getAs[String](1))).show
//Access the fields by fieldname
df.map(x=> (x.getAs[Int]("id"),x.getAs[String]("colors"))).show
// Create a new Row based on Index position of fields
df.map(x=> (x.getInt(0),x.getString(1))).show
//Convert both fields to string of type DataSet
val df1 = df.as[(String,String)]
df1.printSchema
df1.getClass
// Map the values to Row type and do any calculations
df.map {
case Row(id: Int, colors: String) => (id, colors)
}.show
// Do custom logic within MAP and return a new resultset
df.map {
case Row(id: Int, colors: String) => (if(id==1) id*2 else id, colors)
}.show
case class Record(ID: Int, Colors: String)
// If ID is 1, then changes Color for that row to "Black White" and keep the rest of the fields as is
df.as[Record].map {
case myrow if myrow.ID == 1 => myrow.copy(Colors = "Black White")
case rec => rec
}.show
// Split and explode the colors field
df.selectExpr("id","explode(split(colors,' ')) as colors").show
// flatMap colors data
df.flatMap(x=>x.getString(1).split(" ")).show
// ============================================================
// Generate a test DataSet with 2 rows
val ds = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors")
case class Record(ID: Int, Colors: String)
ds.as[Record].map(x=>(x.ID,x.Colors)).show
// ============================================================
// DIsplay all the lines in the DF
val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors")
df.foreach(x=>println(x))
// ============================================================
// Generate a test data with 2 rows
case class TestData(id: Int, desc: String)
val dataList = List(
TestData(1, "abc"),
TestData(2, "xyz")
)
val df = spark.createDataset(dataList)
// ============================================================
// Read a CSV file with header
val data = spark.read.option("header","true")
.option("inferSchema",true)
.csv("file:///Users/abe/Personal/Apache Spark/Scripts/colors.csv")
// ============================================================
// Get Type of a Variable
val df = Seq((1,"abc"),(2,"xyz")).toDF("id","name")
df.getClass
// Get Type of a Variable in SCALA REPL
:type df
//==============================================================
// Create DataFrame with schema
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val schema = StructType(
StructField("a1", StringType) ::
StructField("a2", IntegerType) ::
StructField("a3", IntegerType) ::
StructField("a4", IntegerType) ::
StructField("a5", IntegerType) :: Nil)
val data = List(
Row("001", 1, 1, 1, 1),
Row("001", 2, 2, 2, 2),
Row("001", null, 1, 2, 2),
Row("002", 1, 2, 3, 4),
Row("002", 1, 4, null, 3),
Row("002", 4, null, 1, 5),
Row("003", 5, 1, 0, 2)
)
val df = spark.createDataFrame(data.asJava, schema)
//==============================================================
// Using encoders
// https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset
// https://towardsdatascience.com/apache-spark-dataset-encoders-demystified-4a3026900d63
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val df = Seq((1,"Red Green"),(2,"Blue White")).toDF("ID","Colors")
val schema = StructType(Seq(
StructField("ID", IntegerType),
StructField("Colors", StringType)
))
val encoder = RowEncoder(schema)
case class Record(id: Int, colors: String)
df.map {
case Row(id:Int, colors: String) if id == 1 => Row(id*10, colors)
case row => row
}(encoder).show
data = df.map(row => {
val idVal = row.getAs[Int](0)
val newIdVal = if (idVal == 1) 1*10 else 1*100
val colors = row.getAs[String](1)
Row(newIdVal,colors)
})(encoder).show
//==============================================================
//View the fields of DataFrame
val df = Seq((1,"abc"),(2,"xyz")).toDF("id","name")
df1.schema.fields
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment