Skip to content

Instantly share code, notes, and snippets.

@Samrose-Ahmed
Created September 22, 2022 17:55
Show Gist options
  • Save Samrose-Ahmed/a2f02d770da2c351a1a2a4d06bc4ce2e to your computer and use it in GitHub Desktop.
Save Samrose-Ahmed/a2f02d770da2c351a1a2a4d06bc4ce2e to your computer and use it in GitHub Desktop.
fun processRecord(sqsMessage: SQSMessage): Unit {
val record = S3EventNotification.parseJson(sqsMessage.body).records[0]
val s3Bucket = record.s3.bucket.name
val s3Object = record.s3.`object`
val s3ObjectKey = s3Object.key
val s3ObjectSize = s3Object.sizeAsLong
val s3Path = "s3://$s3Bucket/$s3ObjectKey"
if (checkDuplicate(s3Object.sequencer)) {
println("Found duplicate SQS message for key: ${s3ObjectKey}. Skipping...")
return
}
val metrics = readParquetMetrics(s3Path, icebergTable)
val partition = PartitionSpec.builderFor(icebergTable.schema()).day(TIMESTAMP_COLUMN_NAME).build()
val dataFile = DataFiles.builder(partition)
.withPath(s3Path)
.withFileSizeInBytes(s3ObjectSize)
.withFormat("PARQUET")
.withMetrics(metrics)
.build()
appendFiles.appendFile(dataFile)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment