Skip to content

Instantly share code, notes, and snippets.

@mvonthron
Created November 8, 2017 15:37
Show Gist options
  • Save mvonthron/a0c925ccf0607c90692d33ad2a741d38 to your computer and use it in GitHub Desktop.
Save mvonthron/a0c925ccf0607c90692d33ad2a741d38 to your computer and use it in GitHub Desktop.
Predicate pushdown tests
import java.sql.Timestamp
import java.util.UUID
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.sources.Or
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.joda.time.DateTime
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
case class Scenario(name: String, filter: DataFrame => DataFrame)
case class TestResult(name: String, resultSize: Long, elapsedTime: Long, recordsRead: Long, bytesRead: Long)
class TestListener() extends SparkListener {
var totalRecordRead: Long = 0
var totalBytesRead: Long = 0
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val (records, bytes) = (stageCompleted.stageInfo.taskMetrics.inputMetrics.recordsRead, stageCompleted.stageInfo.taskMetrics.inputMetrics.bytesRead)
totalRecordRead += records
totalBytesRead += bytes
}
def clear() = {
totalRecordRead = 0
totalBytesRead = 0
}
}
object Main extends App {
val id = UUID.randomUUID
val Jan = new Timestamp(DateTime.parse("2017-01-01T00:00:00Z").getMillis)
val Feb = new Timestamp(DateTime.parse("2017-02-01T00:00:00Z").getMillis)
val Mar = new Timestamp(DateTime.parse("2017-03-01T00:00:00Z").getMillis)
// val Jan = DateTime.parse("2017-01-01T00:00:00Z").getMillis
// val Feb = DateTime.parse("2017-02-01T00:00:00Z").getMillis
// val Mar = DateTime.parse("2017-03-01T00:00:00Z").getMillis
val scenarios = Seq(
Scenario("No filter", identity),
Scenario("Filter on INT", _.filter(col("int_col") >= 2)),
Scenario("Filter on non null INT", _.filter(col("int_null_col").isNotNull)),
Scenario("Filter on BOOL", _.filter(col("bool_col") =!= false)),
Scenario("Filter on == LONG", _.filter(col("long_col") === 1)),
Scenario("Filter on <= LONG", _.filter(col("long_col") <= 2)),
Scenario("Filter on == DOUBLE", _.filter(col("double_col") === 1.0)),
Scenario("Filter on == FLOAT", _.filter(col("float_col") === 1.0.asInstanceOf[Float])),
Scenario("Filter on STRING", _.filter(col("string_col") === "event 1")),
Scenario("Filter isIn STRING", _.filter(col("string_col").isin("event 1", "event 2"))),
Scenario("Filter == OR STRING ", _.filter( col("string_col") === "event 1" || col("string_col") === "event 2") ),
Scenario("Filter on non null STRING", _.filter(col("string_null_col").isNotNull)),
Scenario("Filter on >= TS", _.filter(col("timestamp_col") >= Feb)),
Scenario("Filter on == TS", _.filter(col("timestamp_col") === Feb)),
Scenario("Filter on == casted TS", _.filter(col("timestamp_col").cast(LongType) === Feb.getTime)),
Scenario("Filter on missing column", _.withColumn("doesntexist", lit(0)).filter(col("doesntexist") === 2)),
Scenario("Filter on nested col INT", _.filter(col("nested.int_col") === 2)),
Scenario("Filter on dotted col INT", _.filter(col("`dotted.int_col`") === 2)),
Scenario("Column projection", _.select(col("int_col"))),
Scenario("Select constant", _.select(lit("foo").as("bar")))
)
val conf = new SparkConf()
.set("spark.master", "local[4]")
.set("spark.app.name", "PredicatePushdownTest")
.set("spark.sql.parquet.filterPushdown", "true")
.set("spark.sql.parquet.mergeSchema", "false")
// .set("spark.sql.parquet.int96AsTimestamp", "false")
// .set("spark.sql.parquet.int64AsTimestampMillis", "true") // <- when set true, statistics are filled but still not used
.set("parquet.filter.statistics.enabled", "true")
.set("parquet.filter.dictionary.enabled", "true")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
sparkSession.sparkContext.setLogLevel("WARN")
println(s"TESTING SPARK VERSION ${sparkSession.version}")
val schema = StructType(
Seq(
StructField("id", IntegerType),
StructField("string_col", StringType),
StructField("string_null_col", StringType),
StructField("timestamp_col", DataTypes.TimestampType),
StructField("int_col", IntegerType),
StructField("int_null_col", IntegerType),
StructField("double_col", DoubleType),
StructField("long_col", LongType),
StructField("float_col", DataTypes.FloatType),
StructField("bool_col", BooleanType),
StructField("nested", StructType(Seq(StructField("int_col", IntegerType)))),
StructField("dotted.int_col", IntegerType)
)
)
val rows = Seq(
Row(1, "event 1", "event 1", Jan, 1, 1, 1.asInstanceOf[Double], 1.asInstanceOf[Long], 1.0.asInstanceOf[Float], true, Row(1), 1),
Row(2, "event 2", "event 2", Feb, 2, 2, 2.asInstanceOf[Double], 2.asInstanceOf[Long], 2.0.asInstanceOf[Float], true, Row(2), 2),
Row(3, "event 3", null, Mar, 3, null, 3.asInstanceOf[Double], 3.asInstanceOf[Long], 3.0.asInstanceOf[Float], false, Row(3), 3)
)
val initDf = sparkSession.createDataFrame(rows.asJava, schema)
initDf.show()
// partition by a non-tested column to force one record per files (to avoid row-grouping)
initDf.write.partitionBy("id").parquet(s"/tmp/restitution/test-$id")
var testResults = Seq.empty[TestResult]
val listener = new TestListener()
sparkSession.sparkContext.addSparkListener(listener)
for(sc <- scenarios) {
sparkSession.sqlContext.clearCache()
listener.clear()
val start = System.currentTimeMillis()
val in = sparkSession.read.parquet(s"/tmp/restitution/test-$id")
Try {
val df = sc.filter(in)
df.explain()
df.collect().length
} match {
case Success(n) =>
val stop = System.currentTimeMillis()
testResults :+= TestResult(sc.name, n, stop-start, listener.totalRecordRead, listener.totalBytesRead)
case Failure(e) =>
println(e)
}
// scala.io.StdIn.readLine()
}
printResults(testResults)
println(s"END TESTS SPARK ${sparkSession.version}")
sparkSession.stop()
def printResults(testResults: Seq[TestResult]) = {
println("TEST RESULTS:")
println("----------------------------------+-----------+------------------+------------------+-----------------")
printf(" %-30s | %8s | %14s | %14s | %14s \n", "Name", "#rows", "duration (ms)", "#records read", "bytes read")
println("----------------------------------+-----------+------------------+------------------+-----------------")
testResults.foreach{ r =>
printf("%s %-30s | %8s | %14s | %14s | %14s \n", if(rows.size != r.recordsRead) "->" else " ", r.name, r.resultSize.toString, r.elapsedTime.toString, r.recordsRead.toString, r.bytesRead.toString)
}
println("----------------------------------+-----------+------------------+------------------+-----------------")
}
}

Parquet predicate pushdown tests

Tests with few rows, many types

Data partitioned by id column to prevent row-grouping.

+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+
| id|string_col|string_null_col|      timestamp_col|int_col|int_null_col|double_col|long_col|float_col|bool_col|nested|dotted.int_col|
+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+
|  1|   event 1|        event 1|2016-12-31 19:00:00|      1|           1|       1.0|       1|      1.0|    true|   [1]|             1|
|  2|   event 2|        event 2|2017-01-31 19:00:00|      2|           2|       2.0|       2|      2.0|    true|   [2]|             2|
|  3|   event 3|           null|2017-02-28 19:00:00|      3|        null|       3.0|       3|      3.0|   false|   [3]|             3|
+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+

And tests:

  val scenarios = Seq(
    Scenario("No filter", identity),
    Scenario("Filter on INT", _.filter(col("int_col") >= 2)),
    Scenario("Filter on non null INT", _.filter(col("int_null_col").isNotNull)),
    Scenario("Filter on BOOL", _.filter(col("bool_col") =!= false)),
    Scenario("Filter on == LONG", _.filter(col("long_col") === 1)),
    Scenario("Filter on <= LONG", _.filter(col("long_col") <= 2)),
    Scenario("Filter on == DOUBLE", _.filter(col("double_col") === 1.0)),
    Scenario("Filter on == FLOAT", _.filter(col("float_col") === 1.0.asInstanceOf[Float])),
    Scenario("Filter on STRING", _.filter(col("string_col") === "event 1")),
    Scenario("Filter isIn STRING", _.filter(col("string_col").isin("event 1", "event 2"))),
    Scenario("Filter == OR STRING ", _.filter( col("string_col") === "event 1" || col("string_col") === "event 2") ),
    Scenario("Filter on non null STRING", _.filter(col("string_null_col").isNotNull)),
    Scenario("Filter on >= TS", _.filter(col("timestamp_col") >= Feb)),
    Scenario("Filter on == TS", _.filter(col("timestamp_col") === Feb)),
    Scenario("Filter on == casted TS", _.filter(col("timestamp_col").cast(LongType) === Feb.getTime)),
    Scenario("Filter on missing column", _.withColumn("doesntexist", lit(0)).filter(col("doesntexist") === 2)),
    Scenario("Filter on nested col INT", _.filter(col("nested.int_col") === 2)),
    Scenario("Filter on dotted col INT", _.filter(col("`dotted.int_col`") === 2)),
    Scenario("Column projection", _.select(col("int_col"))),
    Scenario("Select constant", _.select(lit("foo").as("bar")))
  )

If we don't force 1.0.asInstanceOf[Float], it would be taken as a Double and Spark would add a cast to the filter, blocking the pushdown.

Spark 2.1.0 / Parquet 1.8.1

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
   Name                           |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------
   No filter                      |         3 |              888 |                3 |             7281
-> Filter on INT                  |         2 |              577 |                2 |             7281
-> Filter on non null INT         |         2 |              369 |                2 |             7281
-> Filter on BOOL                 |         2 |              425 |                2 |             7281
-> Filter on == LONG              |         1 |              492 |                1 |             7281
-> Filter on <= LONG              |         2 |              431 |                2 |             7281
-> Filter on == DOUBLE            |         1 |              514 |                1 |             7281
-> Filter on == FLOAT             |         1 |              564 |                1 |             7281
   Filter on STRING               |         1 |              425 |                3 |             7281
   Filter isIn STRING             |         2 |              343 |                3 |             7281
   Filter == OR STRING            |         2 |              235 |                3 |             7281
   Filter on non null STRING      |         2 |              358 |                3 |             7281
   Filter on >= TS                |         2 |              244 |                3 |             7281
   Filter on == TS                |         1 |              244 |                3 |             7281
   Filter on == casted TS         |         0 |              205 |                3 |             7281
-> Filter on missing column       |         0 |              171 |                0 |                0
   Filter on nested col INT       |         1 |              189 |                3 |             7281
   Column projection              |         3 |              220 |                3 |             7281
   Select constant                |         3 |              179 |                3 |             7281
----------------------------------+-----------+------------------+------------------+-----------------

Spark 2.2.0 / Parquet 1.8.2

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
   Name                           |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------
   No filter                      |         3 |             1197 |                3 |            15309
-> Filter on INT                  |         2 |             1297 |                2 |            12731
-> Filter on non null INT         |         2 |              398 |                2 |            12806
-> Filter on BOOL                 |         2 |              563 |                2 |            12806
-> Filter on == LONG              |         1 |              406 |                1 |            10228
-> Filter on <= LONG              |         2 |              419 |                2 |            12806
-> Filter on == DOUBLE            |         1 |              444 |                1 |            10228
-> Filter on == FLOAT             |         1 |              275 |                1 |            10228
-> Filter on STRING               |         1 |              336 |                1 |            15309
   Filter isIn STRING             |         2 |              340 |                3 |            15309
-> Filter == OR STRING            |         2 |              252 |                2 |            15309
-> Filter on non null STRING      |         2 |              387 |                2 |            15309
   Filter on >= TS                |         2 |              346 |                3 |            15309
   Filter on == TS                |         1 |              215 |                3 |            15309
   Filter on == casted TS         |         0 |              243 |                3 |            15309
-> Filter on missing column       |         0 |              241 |                0 |                0
   Filter on nested col INT       |         1 |              220 |                3 |            15309
   Filter on dotted col INT       |         1 |              199 |                3 |            15309
   Column projection              |         3 |              219 |                3 |            15333
   Select constant                |         3 |              216 |                3 |             7734
----------------------------------+-----------+------------------+------------------+-----------------

Tests with few types, many rows

Three types:

  • Int
  • String
  • Timestamp

Three "meaningful" rows + 1M random rows

Spark 2.1.0 / Parquet 1.8.1

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
   Name                           |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------
   No filter                      |   1000003 |             9038 |          1000003 |         51870633
-> Filter on INT                  |         2 |             1281 |              380 |         51870633
   Filter on STRING               |         2 |              855 |          1000003 |         51870633
   Filter isIn STRING             |         3 |              727 |          1000003 |         51870633
   Filter on == TS                |         1 |              573 |          1000003 |         51870633
-> Filter on missing column       |         0 |              160 |                0 |                0
   Column projection              |   1000003 |             1081 |          1000003 |         51870633
   Select constant                |   1000003 |              649 |          1000003 |         51870633
----------------------------------+-----------+------------------+------------------+-----------------

Size on disk:

▶ du -hs tests-big-files-2.1.0
 50M	tests-big-files-2.1.0

Parquet-reader:

▶ preader tests-big-files-2.1.0/part-00000-52408e5c-0c49-43c0-8e11-99f3ca9ba679.snappy.parquet | head -n 20
File Name: tests-big-files-2.1.0/part-00000-52408e5c-0c49-43c0-8e11-99f3ca9ba679.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
Total rows: 500001
Number of RowGroups: 2632
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: int_col (INT32)
Column 1: string_col (BYTE_ARRAY)
Column 2: timestamp_col (INT96)
[...]

Spark 2.2.0 / Parquet 1.8.2

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
   Name                           |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------
   No filter                      |   1000003 |             8354 |          1000003 |         52540350
-> Filter on INT                  |         2 |             1624 |              380 |          1692606
   Filter on STRING               |         2 |              942 |          1000003 |         52540350
   Filter isIn STRING             |         3 |              881 |          1000003 |         52540350
   Filter on == TS                |         1 |              697 |          1000003 |         52540350
-> Filter on missing column       |         0 |              239 |                0 |                0
   Column projection              |   1000003 |              881 |          1000003 |         51221438
   Select constant                |   1000003 |              778 |          1000003 |          1553342
----------------------------------+-----------+------------------+------------------+-----------------

Size on disk:

▶ du -hs tests-big-files-2.2.0
 50M	tests-big-files-2.2.0

Parquet-reader:

▶ preader tests-big-files-2.2.0/part-00000-b4424991-a564-4e80-908c-1bb1fa5837a4-c000.snappy.parquet | head -n 20
File Name: tests-big-files-2.2.0/part-00000-b4424991-a564-4e80-908c-1bb1fa5837a4-c000.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 500001
Number of RowGroups: 2632
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: int_col (INT32)
Column 1: string_col (BYTE_ARRAY)
Column 2: timestamp_col (INT96)
[...]

Column projection

500000 records with 50 columns of random Ints => Selecting a various number of columns, with and without filters

  val schema = StructType((0 until 50).map(i => StructField(s"col$i", IntegerType)))
  val rows = (0 until 500000).map{ _ => Row( (0 until 50).map(_ => Random.nextInt()): _*) }

Tests:

  val scenarios = Seq(
    Scenario("No filter", identity),
    Scenario("filter col32 >= 100000", _.filter(col("col32") >= 100000)),
    Scenario("col0", _.select("col0")),
    Scenario("col0, col1", _.select("col0", "col1")),
    Scenario("col0, col1, col2", _.select("col0", "col1", "col2")),
    Scenario("col0, col1, col2 + filter col32 >= 100000", _.filter(col("col32") >= 100000).select("col0", "col1", "col2")),
    Scenario("col0..col9", _.select( (0 until 10).map(i => col(s"col$i")): _*) ),
    Scenario("col20..col29", _.select( (20 until 30).map(i => col(s"col$i")): _*) ),
    Scenario("col0..col25", _.select( (0 until 26).map(i => col(s"col$i")): _*) ),
    Scenario("col0..col25 + filter col32 >= 100000", _.filter(col("col32") >= 100000).select( (0 until 26).map(i => col(s"col$i")): _*) ),
    Scenario("col0..col45", _.select( (0 until 40).map(i => col(s"col$i")): _*) ),
    Scenario("Filter on missing column", _.withColumn("doesntexist", lit(0)).filter(col("doesntexist") === 2)),
    Scenario("Select constant", _.select(lit("foo").as("bar")))
  )

File header

▶ preader test-big-columns-a1ca0b3a-bf28-408a-acfc-ca05bbe54c9d/part-00000-928aaf55-7600-4214-b0f0-620826280fd4-c000.snappy.parquet | head 
File Name: test-big-columns-a1ca0b3a-bf28-408a-acfc-ca05bbe54c9d/part-00000-928aaf55-7600-4214-b0f0-620826280fd4-c000.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 125000
Number of RowGroups: 491
Number of Real Columns: 50
Number of Columns: 50
Number of Selected Columns: 50
Column 0: col0 (INT32)
Column 1: col1 (INT32)
Column 2: col2 (INT32)
Column 3: col3 (INT32)
Column 4: col4 (INT32)
Column 5: col5 (INT32)
Column 6: col6 (INT32)
Column 7: col7 (INT32)
Column 8: col8 (INT32)
Column 9: col9 (INT32)
Column 10: col10 (INT32)
Column 11: col11 (INT32)
...

Scala 2.1.0 / Parquet 1.8.1

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
   Name                                       |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
   No filter                                  |    500000 |            35397 |           500000 |        110123732
   filter col32 >= 100000                     |    249848 |             7325 |           500000 |        110123732
   col0                                       |    500000 |              962 |           500000 |        110123732
   col0, col1                                 |    500000 |             1002 |           500000 |        110123732
   col0, col1, col2                           |    500000 |              890 |           500000 |        110123732
   col0, col1, col2 + filter col32 >= 100000  |    249848 |              620 |           500000 |        110123732
   col0..col9                                 |    500000 |             1313 |           500000 |        110123732
   col20..col29                               |    500000 |             1659 |           500000 |        110123732
   col0..col25                                |    500000 |             2979 |           500000 |        110123732
   col0..col25 + filter col32 >= 100000       |    249848 |             2538 |           500000 |        110123732
   col0..col45                                |    500000 |             5365 |           500000 |        110123732
   Filter on missing column                   |         0 |              296 |                0 |                0
   Select constant                            |    500000 |              735 |           500000 |        110123732
----------------------------------+-----------+------------------+------------------+-----------------------------

Scala 2.2.0 / Parquet 1.8.2

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
   Name                                       |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
   No filter                                  |    500000 |            20388 |           500000 |        112039560
   filter col32 >= 100000                     |    249890 |             5719 |           500000 |        112039560
   col0                                       |    500000 |             1773 |           500000 |         71669384
   col0, col1                                 |    500000 |             1150 |           500000 |         71669384
   col0, col1, col2                           |    500000 |             1388 |           500000 |         71669384
   col0, col1, col2 + filter col32 >= 100000  |    249890 |             1046 |           500000 |         92903048
   col0..col9                                 |    500000 |             1736 |           500000 |         71669384
   col20..col29                               |    500000 |             1591 |           500000 |         71653000
   col0..col25                                |    500000 |             2708 |           500000 |        103650952
   col0..col25 + filter col32 >= 100000       |    249890 |             2816 |           500000 |        103650952
   col0..col45                                |    500000 |             5261 |           500000 |        110204552
   Filter on missing column                   |         0 |              582 |                0 |                0
   Select constant                            |    500000 |              849 |           500000 |          6379144
----------------------------------+-----------+------------------+------------------+-----------------------------

Scala 2.2.0 / Parquet 1.8.2 with initial data sorted by col32

Pre-sorting by the column 32 has a major impact on filtering.

TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
   Name                                       |     #rows |    duration (ms) |    #records read |       bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
   No filter                                  |    500000 |             9123 |           500000 |        112039520
   filter col32 >= 100000                     |    250042 |            10256 |           250050 |         59278944
   col0                                       |    500000 |              934 |           500000 |         71669344
   col0, col1                                 |    500000 |             1030 |           500000 |         71669344
   col0, col1, col2                           |    500000 |             1198 |           500000 |         71669344
   col0, col1, col2 + filter col32 >= 100000  |    250042 |              773 |           250050 |         49710688
   col0..col9                                 |    500000 |             1193 |           500000 |         71669344
   col20..col29                               |    500000 |             1305 |           500000 |         71652960
   col0..col25                                |    500000 |             2382 |           500000 |        103650912
   col0..col25 + filter col32 >= 100000       |    250042 |             1536 |           250050 |         55084640
   col0..col45                                |    500000 |             5014 |           500000 |        110204512
   Filter on missing column                   |         0 |              250 |                0 |                0
   Select constant                            |    500000 |              697 |           500000 |          6379104
----------------------------------+-----------+------------------+------------------+-----------------------------
@vontman
Copy link

vontman commented May 31, 2020

it would be cool to rerun the tests on newer version of spark, it now supports v1.10.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment