Spark context Web UI available at http://nirisvara:4040
Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :load SparkStreamingFromDirectory-S3A.scala
Loading SparkStreamingFromDirectory-S3A.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory
scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:14:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
|-- RecordNumber: integer (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- LocationType: string (nullable = true)
|-- Lat: string (nullable = true)
|-- Long: string (nullable = true)
|-- Xaxis: string (nullable = true)
|-- Yaxis: string (nullable = true)
|-- Zaxis: string (nullable = true)
|-- WorldRegion: string (nullable = true)
|-- Country: string (nullable = true)
|-- LocationText: string (nullable = true)
|-- Location: string (nullable = true)
|-- Decommisioned: string (nullable = true)
root
|-- Zipcode: string (nullable = true)
|-- count: long (nullable = false)
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|Zipcode|count|
+-------+-----+
|76166 |2 |
|32564 |2 |
|85210 |2 |
|36275 |3 |
|709 |3 |
|35146 |3 |
|708 |2 |
|35585 |3 |
|32046 |2 |
|27203 |4 |
|34445 |2 |
|27007 |4 |
|704 |10 |
|27204 |4 |
|34487 |2 |
|85209 |2 |
|76177 |4 |
+-------+-----+
Amount of calls
mc support top api myminio/
API RX TX CALLS ERRORS
s3.CopyObject 48 KiB 47 KiB 208 0
s3.DeleteMultipleObjects 146 KiB 47 KiB 417 0
s3.DeleteObject 32 KiB 0 B 211 0
s3.GetObject 168 B 1.3 KiB 1 0
s3.HeadObject 441 KiB 0 B 2950 0
s3.ListObjectsV2 408 KiB 1.4 MiB 2732 0
s3.PutObject 128 KiB 0 B 419 0
Summary:
Total: 6938 CALLS, 1.2 MiB RX, 1.5 MiB TX - in 72.36s
The amount of files left over in the wake of this behavior on a versioned buckets.
~ mc ls -r --versions myminio/process-runner/ | wc -l
1023
Our of which 614
actual objects
~ mc ls -r --versions myminio/process-runner/ | grep PUT | wc -l
614
and almost 409
delete markers (soft deletes)
~ mc ls -r --versions myminio/process-runner/ | grep DEL | wc -l
409
Actual objects on namespace without versioning lookup
~ mc ls -r myminio/process-runner/ | wc -l
205
Spark context Web UI available at http://nirisvara:4040
Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :load SparkStreamingFromDirectory
SparkStreamingFromDirectory-S3A.scala SparkStreamingFromDirectory.scala
scala> :load SparkStreamingFromDirectory.scala
Loading SparkStreamingFromDirectory.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory
scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:20:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
|-- RecordNumber: integer (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- LocationType: string (nullable = true)
|-- Lat: string (nullable = true)
|-- Long: string (nullable = true)
|-- Xaxis: string (nullable = true)
|-- Yaxis: string (nullable = true)
|-- Zaxis: string (nullable = true)
|-- WorldRegion: string (nullable = true)
|-- Country: string (nullable = true)
|-- LocationText: string (nullable = true)
|-- Location: string (nullable = true)
|-- Decommisioned: string (nullable = true)
root
|-- Zipcode: string (nullable = true)
|-- count: long (nullable = false)
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|Zipcode|count|
+-------+-----+
|76166 |2 |
|32564 |2 |
|85210 |2 |
|36275 |3 |
|709 |3 |
|35146 |3 |
|708 |2 |
|35585 |3 |
|32046 |2 |
|27203 |4 |
|34445 |2 |
|27007 |4 |
|704 |10 |
|27204 |4 |
|34487 |2 |
|85209 |2 |
|76177 |4 |
+-------+-----+
~ mc support top api myminio/
API RX TX CALLS ERRORS
s3.GetObject 159 B 1.3 KiB 1 0
s3.HeadObject 1.5 KiB 0 B 10 0
s3.ListObjectVersions 765 B 2.0 KiB 5 0
s3.PutObject 88 KiB 0 B 208 0
Summary:
Total: 224 CALLS, 90 KiB RX, 3.3 KiB TX - in 17.00s
Actual number of valid objects
~ mc ls -r --versions myminio/process-runner/ | wc -l
205
Actual objects on namespace without versioning lookup
~ mc ls -r myminio/process-runner/ | wc -l
205
Without Optimization | With Optimization |
---|---|
72secs | 17secs |
Total DEL markers without optimization | Total DEL markers with optimization |
---|---|
409 | 0 |
Total excess objects without optimization | Total excess objects with optimization |
---|---|
818 (out of which 409 are DEL markers) | 0 |
Total number of API calls without optimization | Total number of API calls with optimization |
---|---|
6938 | 224 |
API Calls / Objects without optimization | API Calls / objects with optimization |
---|---|
33.8x | 1.09x |
This tells us that default checkpoint implementation shipped with Spark is very poorly optmized to use object storage and it is recommended that Direct Checkpointing to be used instead as the benefits are visible beyond doubt.