Data partitioned by id
column to prevent row-grouping.
+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+
| id|string_col|string_null_col| timestamp_col|int_col|int_null_col|double_col|long_col|float_col|bool_col|nested|dotted.int_col|
+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+
| 1| event 1| event 1|2016-12-31 19:00:00| 1| 1| 1.0| 1| 1.0| true| [1]| 1|
| 2| event 2| event 2|2017-01-31 19:00:00| 2| 2| 2.0| 2| 2.0| true| [2]| 2|
| 3| event 3| null|2017-02-28 19:00:00| 3| null| 3.0| 3| 3.0| false| [3]| 3|
+---+----------+---------------+-------------------+-------+------------+----------+--------+---------+--------+------+--------------+
And tests:
val scenarios = Seq(
Scenario("No filter", identity),
Scenario("Filter on INT", _.filter(col("int_col") >= 2)),
Scenario("Filter on non null INT", _.filter(col("int_null_col").isNotNull)),
Scenario("Filter on BOOL", _.filter(col("bool_col") =!= false)),
Scenario("Filter on == LONG", _.filter(col("long_col") === 1)),
Scenario("Filter on <= LONG", _.filter(col("long_col") <= 2)),
Scenario("Filter on == DOUBLE", _.filter(col("double_col") === 1.0)),
Scenario("Filter on == FLOAT", _.filter(col("float_col") === 1.0.asInstanceOf[Float])),
Scenario("Filter on STRING", _.filter(col("string_col") === "event 1")),
Scenario("Filter isIn STRING", _.filter(col("string_col").isin("event 1", "event 2"))),
Scenario("Filter == OR STRING ", _.filter( col("string_col") === "event 1" || col("string_col") === "event 2") ),
Scenario("Filter on non null STRING", _.filter(col("string_null_col").isNotNull)),
Scenario("Filter on >= TS", _.filter(col("timestamp_col") >= Feb)),
Scenario("Filter on == TS", _.filter(col("timestamp_col") === Feb)),
Scenario("Filter on == casted TS", _.filter(col("timestamp_col").cast(LongType) === Feb.getTime)),
Scenario("Filter on missing column", _.withColumn("doesntexist", lit(0)).filter(col("doesntexist") === 2)),
Scenario("Filter on nested col INT", _.filter(col("nested.int_col") === 2)),
Scenario("Filter on dotted col INT", _.filter(col("`dotted.int_col`") === 2)),
Scenario("Column projection", _.select(col("int_col"))),
Scenario("Select constant", _.select(lit("foo").as("bar")))
)
If we don't force 1.0.asInstanceOf[Float]
, it would be taken as a Double
and Spark would add a cast
to the filter, blocking the pushdown.
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------
No filter | 3 | 888 | 3 | 7281
-> Filter on INT | 2 | 577 | 2 | 7281
-> Filter on non null INT | 2 | 369 | 2 | 7281
-> Filter on BOOL | 2 | 425 | 2 | 7281
-> Filter on == LONG | 1 | 492 | 1 | 7281
-> Filter on <= LONG | 2 | 431 | 2 | 7281
-> Filter on == DOUBLE | 1 | 514 | 1 | 7281
-> Filter on == FLOAT | 1 | 564 | 1 | 7281
Filter on STRING | 1 | 425 | 3 | 7281
Filter isIn STRING | 2 | 343 | 3 | 7281
Filter == OR STRING | 2 | 235 | 3 | 7281
Filter on non null STRING | 2 | 358 | 3 | 7281
Filter on >= TS | 2 | 244 | 3 | 7281
Filter on == TS | 1 | 244 | 3 | 7281
Filter on == casted TS | 0 | 205 | 3 | 7281
-> Filter on missing column | 0 | 171 | 0 | 0
Filter on nested col INT | 1 | 189 | 3 | 7281
Column projection | 3 | 220 | 3 | 7281
Select constant | 3 | 179 | 3 | 7281
----------------------------------+-----------+------------------+------------------+-----------------
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------
No filter | 3 | 1197 | 3 | 15309
-> Filter on INT | 2 | 1297 | 2 | 12731
-> Filter on non null INT | 2 | 398 | 2 | 12806
-> Filter on BOOL | 2 | 563 | 2 | 12806
-> Filter on == LONG | 1 | 406 | 1 | 10228
-> Filter on <= LONG | 2 | 419 | 2 | 12806
-> Filter on == DOUBLE | 1 | 444 | 1 | 10228
-> Filter on == FLOAT | 1 | 275 | 1 | 10228
-> Filter on STRING | 1 | 336 | 1 | 15309
Filter isIn STRING | 2 | 340 | 3 | 15309
-> Filter == OR STRING | 2 | 252 | 2 | 15309
-> Filter on non null STRING | 2 | 387 | 2 | 15309
Filter on >= TS | 2 | 346 | 3 | 15309
Filter on == TS | 1 | 215 | 3 | 15309
Filter on == casted TS | 0 | 243 | 3 | 15309
-> Filter on missing column | 0 | 241 | 0 | 0
Filter on nested col INT | 1 | 220 | 3 | 15309
Filter on dotted col INT | 1 | 199 | 3 | 15309
Column projection | 3 | 219 | 3 | 15333
Select constant | 3 | 216 | 3 | 7734
----------------------------------+-----------+------------------+------------------+-----------------
Three types:
Int
String
Timestamp
Three "meaningful" rows + 1M random rows
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------
No filter | 1000003 | 9038 | 1000003 | 51870633
-> Filter on INT | 2 | 1281 | 380 | 51870633
Filter on STRING | 2 | 855 | 1000003 | 51870633
Filter isIn STRING | 3 | 727 | 1000003 | 51870633
Filter on == TS | 1 | 573 | 1000003 | 51870633
-> Filter on missing column | 0 | 160 | 0 | 0
Column projection | 1000003 | 1081 | 1000003 | 51870633
Select constant | 1000003 | 649 | 1000003 | 51870633
----------------------------------+-----------+------------------+------------------+-----------------
Size on disk:
▶ du -hs tests-big-files-2.1.0
50M tests-big-files-2.1.0
Parquet-reader:
▶ preader tests-big-files-2.1.0/part-00000-52408e5c-0c49-43c0-8e11-99f3ca9ba679.snappy.parquet | head -n 20
File Name: tests-big-files-2.1.0/part-00000-52408e5c-0c49-43c0-8e11-99f3ca9ba679.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
Total rows: 500001
Number of RowGroups: 2632
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: int_col (INT32)
Column 1: string_col (BYTE_ARRAY)
Column 2: timestamp_col (INT96)
[...]
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------
No filter | 1000003 | 8354 | 1000003 | 52540350
-> Filter on INT | 2 | 1624 | 380 | 1692606
Filter on STRING | 2 | 942 | 1000003 | 52540350
Filter isIn STRING | 3 | 881 | 1000003 | 52540350
Filter on == TS | 1 | 697 | 1000003 | 52540350
-> Filter on missing column | 0 | 239 | 0 | 0
Column projection | 1000003 | 881 | 1000003 | 51221438
Select constant | 1000003 | 778 | 1000003 | 1553342
----------------------------------+-----------+------------------+------------------+-----------------
Size on disk:
▶ du -hs tests-big-files-2.2.0
50M tests-big-files-2.2.0
Parquet-reader:
▶ preader tests-big-files-2.2.0/part-00000-b4424991-a564-4e80-908c-1bb1fa5837a4-c000.snappy.parquet | head -n 20
File Name: tests-big-files-2.2.0/part-00000-b4424991-a564-4e80-908c-1bb1fa5837a4-c000.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 500001
Number of RowGroups: 2632
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: int_col (INT32)
Column 1: string_col (BYTE_ARRAY)
Column 2: timestamp_col (INT96)
[...]
500000 records with 50 columns of random Ints => Selecting a various number of columns, with and without filters
val schema = StructType((0 until 50).map(i => StructField(s"col$i", IntegerType)))
val rows = (0 until 500000).map{ _ => Row( (0 until 50).map(_ => Random.nextInt()): _*) }
Tests:
val scenarios = Seq(
Scenario("No filter", identity),
Scenario("filter col32 >= 100000", _.filter(col("col32") >= 100000)),
Scenario("col0", _.select("col0")),
Scenario("col0, col1", _.select("col0", "col1")),
Scenario("col0, col1, col2", _.select("col0", "col1", "col2")),
Scenario("col0, col1, col2 + filter col32 >= 100000", _.filter(col("col32") >= 100000).select("col0", "col1", "col2")),
Scenario("col0..col9", _.select( (0 until 10).map(i => col(s"col$i")): _*) ),
Scenario("col20..col29", _.select( (20 until 30).map(i => col(s"col$i")): _*) ),
Scenario("col0..col25", _.select( (0 until 26).map(i => col(s"col$i")): _*) ),
Scenario("col0..col25 + filter col32 >= 100000", _.filter(col("col32") >= 100000).select( (0 until 26).map(i => col(s"col$i")): _*) ),
Scenario("col0..col45", _.select( (0 until 40).map(i => col(s"col$i")): _*) ),
Scenario("Filter on missing column", _.withColumn("doesntexist", lit(0)).filter(col("doesntexist") === 2)),
Scenario("Select constant", _.select(lit("foo").as("bar")))
)
File header
▶ preader test-big-columns-a1ca0b3a-bf28-408a-acfc-ca05bbe54c9d/part-00000-928aaf55-7600-4214-b0f0-620826280fd4-c000.snappy.parquet | head
File Name: test-big-columns-a1ca0b3a-bf28-408a-acfc-ca05bbe54c9d/part-00000-928aaf55-7600-4214-b0f0-620826280fd4-c000.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 125000
Number of RowGroups: 491
Number of Real Columns: 50
Number of Columns: 50
Number of Selected Columns: 50
Column 0: col0 (INT32)
Column 1: col1 (INT32)
Column 2: col2 (INT32)
Column 3: col3 (INT32)
Column 4: col4 (INT32)
Column 5: col5 (INT32)
Column 6: col6 (INT32)
Column 7: col7 (INT32)
Column 8: col8 (INT32)
Column 9: col9 (INT32)
Column 10: col10 (INT32)
Column 11: col11 (INT32)
...
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
No filter | 500000 | 35397 | 500000 | 110123732
filter col32 >= 100000 | 249848 | 7325 | 500000 | 110123732
col0 | 500000 | 962 | 500000 | 110123732
col0, col1 | 500000 | 1002 | 500000 | 110123732
col0, col1, col2 | 500000 | 890 | 500000 | 110123732
col0, col1, col2 + filter col32 >= 100000 | 249848 | 620 | 500000 | 110123732
col0..col9 | 500000 | 1313 | 500000 | 110123732
col20..col29 | 500000 | 1659 | 500000 | 110123732
col0..col25 | 500000 | 2979 | 500000 | 110123732
col0..col25 + filter col32 >= 100000 | 249848 | 2538 | 500000 | 110123732
col0..col45 | 500000 | 5365 | 500000 | 110123732
Filter on missing column | 0 | 296 | 0 | 0
Select constant | 500000 | 735 | 500000 | 110123732
----------------------------------+-----------+------------------+------------------+-----------------------------
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
No filter | 500000 | 20388 | 500000 | 112039560
filter col32 >= 100000 | 249890 | 5719 | 500000 | 112039560
col0 | 500000 | 1773 | 500000 | 71669384
col0, col1 | 500000 | 1150 | 500000 | 71669384
col0, col1, col2 | 500000 | 1388 | 500000 | 71669384
col0, col1, col2 + filter col32 >= 100000 | 249890 | 1046 | 500000 | 92903048
col0..col9 | 500000 | 1736 | 500000 | 71669384
col20..col29 | 500000 | 1591 | 500000 | 71653000
col0..col25 | 500000 | 2708 | 500000 | 103650952
col0..col25 + filter col32 >= 100000 | 249890 | 2816 | 500000 | 103650952
col0..col45 | 500000 | 5261 | 500000 | 110204552
Filter on missing column | 0 | 582 | 0 | 0
Select constant | 500000 | 849 | 500000 | 6379144
----------------------------------+-----------+------------------+------------------+-----------------------------
Pre-sorting by the column 32 has a major impact on filtering.
TEST RESULTS:
----------------------------------+-----------+------------------+------------------+-----------------------------
Name | #rows | duration (ms) | #records read | bytes read
----------------------------------+-----------+------------------+------------------+-----------------------------
No filter | 500000 | 9123 | 500000 | 112039520
filter col32 >= 100000 | 250042 | 10256 | 250050 | 59278944
col0 | 500000 | 934 | 500000 | 71669344
col0, col1 | 500000 | 1030 | 500000 | 71669344
col0, col1, col2 | 500000 | 1198 | 500000 | 71669344
col0, col1, col2 + filter col32 >= 100000 | 250042 | 773 | 250050 | 49710688
col0..col9 | 500000 | 1193 | 500000 | 71669344
col20..col29 | 500000 | 1305 | 500000 | 71652960
col0..col25 | 500000 | 2382 | 500000 | 103650912
col0..col25 + filter col32 >= 100000 | 250042 | 1536 | 250050 | 55084640
col0..col45 | 500000 | 5014 | 500000 | 110204512
Filter on missing column | 0 | 250 | 0 | 0
Select constant | 500000 | 697 | 500000 | 6379104
----------------------------------+-----------+------------------+------------------+-----------------------------
it would be cool to rerun the tests on newer version of spark, it now supports v
1.10.1
.