Created
March 14, 2024 19:26
-
-
Save feliperazeek/4682c5daa8447febf3243baaf41aff36 to your computer and use it in GitHub Desktop.
Iceberg Residual Test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration | |
import org.apache.iceberg.hadoop.HadoopFileIO | |
import org.apache.iceberg.spark.Spark3Util | |
import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Snapshot, TableMetadata, TableMetadataParser, Table => IcebergTable} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} | |
import org.apache.spark.sql.{DataFrame, Row, SparkSession} | |
import org.junit.runner.RunWith | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.flatspec.AnyFlatSpec | |
import org.scalatest.matchers.should.Matchers | |
import org.scalatestplus.junit.JUnitRunner | |
import java.nio.file.{Files, Path} | |
import java.sql.Timestamp | |
import java.util.UUID | |
import scala.collection.JavaConverters._ | |
// scalastyle:off | |
@RunWith(classOf[JUnitRunner]) | |
class IcebergResidual2Test extends AnyFlatSpec with Matchers with BeforeAndAfterAll { | |
private lazy val sparkDir: Path = Files.createTempDirectory(s"${this.getClass.getSimpleName}-${UUID.randomUUID}") | |
val catalogName = "foo" | |
val warehouseLocation = s"/tmp/iceberg-test/warehouse" | |
println(s"Warehouse Location: $warehouseLocation") | |
implicit lazy val sparkConf: SparkConf = new SparkConf() | |
.setAppName(getClass.getSimpleName) | |
.setMaster("local[*]") | |
.set("spark.driver.bindAddress", "127.0.0.1") | |
.set("spark.ui.enabled", "false") | |
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") | |
.set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog") | |
.set(s"spark.sql.catalog.$catalogName.type", "hadoop") | |
.set(s"spark.sql.catalog.$catalogName.warehouse", warehouseLocation) | |
private implicit lazy val spark: SparkSession = SparkSession.builder() | |
.config(sparkConf) | |
.config("spark.sql.warehouse.dir", s"$sparkDir/spark-warehouse") | |
.getOrCreate() | |
"Table" should "have lower/upper bounds set for its files" in { | |
import spark.implicits._ | |
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration | |
val namespace = "namespace" | |
val uniqueId = "foo" // Integer.toHexString((new Random()).nextInt()) | |
val name = s"bar_$uniqueId" | |
val tableName = s"$catalogName.$namespace.`$name`" | |
// Create Table | |
val schema: StructType = StructType(Array( | |
StructField("Id", StringType, nullable = false), | |
StructField("ReceivedAt", TimestampType, nullable = false) | |
)) | |
// Define the structure of your data | |
val data = Seq( | |
Row("1", new Timestamp(System.currentTimeMillis())), | |
Row("2", new Timestamp(System.currentTimeMillis() + 1000)), // 1 second later | |
Row("3", new Timestamp(System.currentTimeMillis() + 2000)) // 2 seconds later | |
) | |
spark.sql(s"create database if not exists $namespace") | |
// Create a DataFrame with the specified structure | |
val df = spark.createDataFrame( | |
spark.sparkContext.parallelize(data), // Convert the Seq of Rows to an RDD | |
schema | |
) | |
// Convert 'received_at' from String to Timestamp | |
val dfWithTimestamp: DataFrame = df | |
.withColumn("ReceivedAt", to_timestamp($"ReceivedAt")) | |
dfWithTimestamp.collect().foreach { row => | |
println("----- Row -----") | |
print(s"Id: ${row.getAs("Id")}, Received At: ${row.getAs("ReceivedAt")}") | |
println("") | |
} | |
// Load the Iceberg table | |
val icebergTable: IcebergTable = Spark3Util.loadIcebergTable(spark, tableName) | |
// Insert the DataFrame into the table | |
dfWithTimestamp.write | |
.mode("append") // Use "overwrite" if you want to replace existing data | |
.insertInto(tableName) | |
// Accessing the table's metadata files | |
val io: HadoopFileIO = new HadoopFileIO(hadoopConf) | |
val snapshot: Snapshot = icebergTable.currentSnapshot() | |
val files: List[DataFile] = snapshot.addedDataFiles(io).asScala.toList | |
println(s"Files: ${files.mkString(", ")}") | |
// Iterate through the metadata files and print bounds | |
files.foreach { file => | |
println(s"File Path: ${file.path()}") | |
println(s"Record Count: ${file.recordCount()}") | |
println(s"Lower Bounds: ${file.lowerBounds()}") | |
println(s"Upper Bounds: ${file.upperBounds()}") | |
} | |
val bounds: Boolean = files | |
.map { f => !f.upperBounds().isEmpty && !f.lowerBounds().isEmpty} | |
.forall(identity) | |
println(s"Bounds: $bounds") | |
bounds shouldBe true | |
val totalCount: Long = files.map(_.recordCount()).sum | |
println(s"Total Count: $totalCount") | |
totalCount shouldBe data.size | |
} | |
def writeMetadata( | |
schema: Schema, | |
spec: PartitionSpec, | |
properties: Map[String, String], | |
location: String)(implicit conf: Configuration): String = { | |
val metadata: TableMetadata = TableMetadata.newTableMetadata(schema, spec, location, properties.asJava) | |
val path: String = String.format("%s/metadata/v1.metadata.json", metadata.location) | |
writeMetadata(path, metadata) | |
// val content = "1" | |
// val versionPath = Paths.get(s"$location/") | |
// Files.write(versionPath, content.getBytes(StandardCharsets.UTF_8)) | |
path | |
} | |
private def writeMetadata(path: String, metadata: TableMetadata)(implicit conf: Configuration): Unit = { | |
val io = new HadoopFileIO(conf) | |
val outputFile = io.newOutputFile(path) | |
TableMetadataParser.write(metadata, outputFile) | |
} | |
override protected def afterAll(): Unit = { | |
spark.close() | |
super.afterAll() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment