Basic file formats - such as CSV, JSON or other text formats - can be useful when exchanging data between applications. When it comes to storing intermediate data between steps of an application, Parquet can provide more advanced capabilities:
- Support for complex types, as opposed to string-based types (CSV) or a limited type system (JSON only supports strings, basic numbers, booleans).
- Columnar storage - more efficient when not all the columns are used or when filtering the data.
- Partitioning - files are partitioned out of the box
- Compression - pages can be compressed with Snappy or Gzip (this preserves the partitioning)
The tests here are performed with Spark 2.0.1 on a cluster with 3 workers (c4.4xlarge
, 16 vCPU and 30 GB each).
Notice that the measured times can vary from one run to each other by +/- 5% and should therefore not be compared strictly.
Before comparing the performance between text files and Parquet files, we need to actually convert the text files to Parquet.
Here, we analyze the results when using the 3 compression methods:
none
: no compressionsnappy
: provides a good balance between compression and speed. The documentation says: "It does not aim for maximum compression, or compatibility with any other compression library; instead, it aims for very high speeds and reasonable compression."gzip
: higher compression than Snappy but very CPU intensive.
Files are read from the local drive and saved to S3.
Here is the code:
val codecs = Array("none", "snappy", "gzip")
for (codec <- codecs) {
df.write.format("parquet")
.option("compression", codec)
.save(s"${rootPath}.parquet-${codec}")
}
Compression | File size | Time |
---|---|---|
none | 170 MB | 156.6 s |
snappy | 72 MB | 157.8 s |
gzip | 44 MB | 152.2 s |
Compression | File size | Time |
---|---|---|
none | 914 MB | 637.6 s |
snappy | 300 MB | 614.8 s |
gzip | 178 MB | 595.6 s |
The baseline for testing performance is a join between 2 files:
mock_accounts_10M.dat
: a JSON file with 10 million accounts, 4.2 GB.mock_transactions_40M_10M.dat
: a CSV file with 40 million accounts, 14.4 GB.
The application leverages the DataFrames API of Spark. Here is the code:
val accountsDF = spark.read.json(accountsFile)
val transactionsDF = spark.sparkContext.textFile(transactionsFile)
.map(Transaction(_))
.toDF()
val joinDF = accountsDF.join(transactionsDF, accountsDF("systemOfRecordAcctId") === transactionsDF("accountId"))
Then, either the results are counted:
val count = joinDF.count()
Or they are written to disk as a CSV file:
joinDF.write.csv(resultsFile)
The application behaves as follows:
- The accounts file is read as 134 partitions (coalesced to 48 partitions by the JSON reader).
- The transactions file is read as 460 partitions.
- The join is performed with 200 partitions (default partitioning).
- The join yields 39998919 results.
Results are as follow:
# of partitions | Time (join + count only) | Time (join + write to CSV only) |
---|---|---|
200 | 70.53 s | 129.7 s |
In this test, we use the Parquet files compressed with Snappy because:
- Snappy provides a good compression ratio while not requiring too much CPU resources
- Snappy is the default compression method when writing Parquet files with Spark.
The code being used here is very similar - we only changed the way the files are read:
val accountsDF = spark.read.parquet(accountsFile)
val transactionsDF = spark.read.parquet(transactionsFile)
val joinDF = accountsDF.join(transactionsDF, accountsDF("systemOfRecordAcctId") === transactionsDF("accountId"))
We run this test multiple times by adjusting the spark.sql.shuffle.partitions
configuration parameter to see the impact of the number of partitions in the join.
Here are the results:
# of partitions | Time (join + count only) | Time (join + write to CSV only) |
---|---|---|
10 | 24.99 s | 92.64 s |
25 | 19.12 s | 74.41 s |
50 | 20.97 s | 55.54 s |
100 | 19.10 s | 50.67 s |
150 | 20.29 s | 48.97 s |
200 | 19.75 s | 48.87 s |
250 | 17.11 s | 50.30 s |
300 | 17.57 s | 48.27 s |
Here is what we can see:
- Counting the results is 3.5 times faster than using text files (19.75 s vs 70.53 s). This is because the files don't need to be parsed and also thanks to the columnar storage: only the columns used in the join are read.
- Computing the results and writing them is 2.5 times faster (48.87 s vs 129.7 s). In this test, all the columns needs to be read.
- The best performance is achieved when the join is performed with 100 partitions or more, that is, with at least 2 tasks per CPU.
GZIP provides the best compression for Parquet files. Around 100% storage saving compared to 73% for SNAPPY. My test were based on Google BigQuery tables stored on Google Cloud storage and accesses as EXTERNAL tables in BigQuery.
https://www.linkedin.com/pulse/leveraging-google-cloud-storage-cost-saving-bigquery-mich/