Skip to content

Instantly share code, notes, and snippets.

@ryanmiville
Last active August 1, 2023 22:25
Show Gist options
  • Save ryanmiville/45f6dc31c08a4b8aa7f82e3cc782ea4d to your computer and use it in GitHub Desktop.
Save ryanmiville/45f6dc31c08a4b8aa7f82e3cc782ea4d to your computer and use it in GitHub Desktop.
Spark utility functions
//> using scala "2.12.17"
//> using dep "org.apache.spark::spark-sql:3.3.2"
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.hadoop.fs._
import java.net._
object utils {
/**
* Get the most recent path in a directory using the Hadoop file system.
* In practice, this is used to get the most recent partition in a table.
*/
def mostRecentPath(path: String)(implicit spark: SparkSession): String = {
val conf = spark.sparkContext.hadoopConfiguration
FileSystem
.get(URI.create(path), conf)
.listStatus(new Path(path + "/"))
.last
.getPath
.toString
}
def schemaFor[A: Encoder]: StructType = implicitly[Encoder[A]].schema
def columnsFor[A: Encoder]: List[Column] = schemaFor[A].fields.map(f => col(f.name)).toList
implicit class DatasetOps[A](private val ds: Dataset[A]) extends AnyVal {
def debug(prefix: String): DebugDataSet[A] = new DebugDataSet(prefix, ds)
}
final class DebugDataSet[A](private val prefix: String, private val ds: Dataset[A]) {
def printSchema(): Dataset[A] = printSchema(Int.MaxValue)
def printSchema(level: Int): Dataset[A] = {
println(prefix)
ds.printSchema(level)
ds
}
def show(numRows: Int): Dataset[A] = show(numRows, truncate = true)
def show(): Dataset[A] = show(20)
def show(truncate: Boolean): Dataset[A] = show(20, truncate)
def show(numRows: Int, truncate: Boolean): Dataset[A] = {
println(prefix)
ds.show(numRows, truncate)
ds
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment