Created
April 8, 2024 16:10
-
-
Save feliperazeek/e34e766b194a644a2f9d8ef7ce2eb6a0 to your computer and use it in GitHub Desktop.
Iceberg Lower/Upper Bounds in Data Files (Parquet vs Avro)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.iceberg.hadoop.HadoopFileIO | |
import org.apache.iceberg.spark.Spark3Util | |
import org.apache.iceberg.{DataFile, Snapshot, TableProperties, Table => IcebergTable} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} | |
import org.apache.spark.sql.{DataFrame, Row, SparkSession} | |
import org.junit.runner.RunWith | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.flatspec.AnyFlatSpec | |
import org.scalatest.matchers.should.Matchers | |
import org.scalatestplus.junit.JUnitRunner | |
import java.nio.file.{Files, Path} | |
import java.sql.Timestamp | |
import java.util.UUID | |
import scala.collection.JavaConverters._ | |
import scala.util.Random | |
// scalastyle:off | |
@RunWith(classOf[JUnitRunner]) | |
class IcebergTableStatsTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { | |
private lazy val sparkDir: Path = Files.createTempDirectory(s"${this.getClass.getSimpleName}-${UUID.randomUUID}") | |
val catalogName = "foo" | |
val warehouseLocation = s"/tmp/iceberg-test/warehouse" | |
implicit lazy val sparkConf: SparkConf = new SparkConf() | |
.setAppName(getClass.getSimpleName) | |
.setMaster("local[*]") | |
.set("spark.driver.bindAddress", "127.0.0.1") | |
.set("spark.ui.enabled", "false") | |
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") | |
.set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog") | |
.set(s"spark.sql.catalog.$catalogName.type", "hadoop") | |
.set(s"spark.sql.catalog.$catalogName.warehouse", warehouseLocation) | |
private implicit lazy val spark: SparkSession = SparkSession.builder() | |
.config(sparkConf) | |
.config("spark.sql.warehouse.dir", s"$sparkDir/spark-warehouse") | |
.getOrCreate() | |
"Parquet Table" should "have lower/upper bounds set for its data files" in { | |
test("parquet") | |
} | |
"Avro Table" should "have lower/upper bounds set for its data files" in { | |
test("avro") | |
} | |
private[this] def test(format: String): Unit = { | |
import spark.implicits._ | |
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration | |
val namespace = "namespace" | |
val uniqueId = Integer.toHexString((new Random()).nextInt()) | |
val name = s"mt_$uniqueId" | |
val tableName = s"$catalogName.$namespace.`$name`" | |
// Create Table | |
val schema: StructType = StructType(Array( | |
StructField("Id", StringType, nullable = false), | |
StructField("ReceivedAt", TimestampType, nullable = false) | |
)) | |
// Define the structure of your data | |
val data = Seq( | |
Row("1", new Timestamp(System.currentTimeMillis())), | |
Row("2", new Timestamp(System.currentTimeMillis() + 1000)), // 1 second later | |
Row("3", new Timestamp(System.currentTimeMillis() + 2000)) // 2 seconds later | |
) | |
spark.sql(s"create database if not exists $namespace") | |
spark.sql(s"""CREATE TABLE $tableName ( | |
| Id STRING, | |
| ReceivedAt TIMESTAMP | |
|) | |
|USING iceberg | |
|PARTITIONED BY (ReceivedAt) | |
|OPTIONS ( | |
| "write.format.default" = "$format", | |
| "write.delete.format.default" = "$format", | |
| "${TableProperties.WRITE_DISTRIBUTION_MODE}" = "${TableProperties.WRITE_DISTRIBUTION_MODE_HASH}", | |
| "${TableProperties.MERGE_DISTRIBUTION_MODE}" = "${TableProperties.WRITE_DISTRIBUTION_MODE_HASH}", | |
| "${TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA}" = "true" | |
|)""".stripMargin) | |
// Create a DataFrame with the specified structure | |
val df = spark.createDataFrame( | |
spark.sparkContext.parallelize(data), // Convert the Seq of Rows to an RDD | |
schema | |
) | |
// Convert 'received_at' from String to Timestamp | |
val dfWithTimestamp: DataFrame = df | |
.withColumn("ReceivedAt", to_timestamp($"ReceivedAt")) | |
dfWithTimestamp.collect().foreach { row => | |
println("----- Row -----") | |
print(s"Id: ${row.getAs("Id")}, Received At: ${row.getAs("ReceivedAt")}") | |
println("") | |
} | |
// Load the Iceberg table | |
val icebergTable: IcebergTable = Spark3Util.loadIcebergTable(spark, tableName) | |
// Insert the DataFrame into the table | |
dfWithTimestamp.write | |
.mode("append") // Use "overwrite" if you want to replace existing data | |
.insertInto(tableName) | |
// Accessing the table's metadata files | |
val io: HadoopFileIO = new HadoopFileIO(hadoopConf) | |
val snapshot: Snapshot = icebergTable.currentSnapshot() | |
val files: List[DataFile] = snapshot.addedDataFiles(io).asScala.toList | |
println(s"Files: ${files.mkString(", ")}") | |
// Iterate through the metadata files and print bounds | |
files.foreach { file => | |
println(s"File Path: ${file.path()}") | |
println(s"Record Count: ${file.recordCount()}") | |
println(s"Lower Bounds: ${file.lowerBounds()}") | |
println(s"Upper Bounds: ${file.upperBounds()}") | |
} | |
val bounds: Boolean = files | |
.map { f => !f.upperBounds().isEmpty && !f.lowerBounds().isEmpty} | |
.forall(identity) | |
println(s"Bounds: $bounds") | |
bounds shouldBe true | |
val totalCount: Long = files.map(_.recordCount()).sum | |
println(s"Total Count: $totalCount") | |
totalCount shouldBe data.size | |
} | |
override protected def afterAll(): Unit = { | |
spark.close() | |
super.afterAll() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment