I hereby claim:
- I am samrose-ahmed on github.
- I am samrosea (https://keybase.io/samrosea) on keybase.
- I have a public key ASDcVIqlPmilrdWFz53bERzvk_nxMdVOhGjoDk-WKv0g4go
To claim this, I am signing this object:
I hereby claim:
To claim this, I am signing this object:
| { | |
| "expiration": "2022-02-14T13:08:46.864Z", | |
| "conditions": [ | |
| { "acl": "bucket-owner-full-control" }, | |
| { "bucket": "my-bucket" }, | |
| ["starts-with", "$key", "stuff/clientId"], | |
| ["content-length-range", 1048576, 10485760] | |
| ] | |
| } |
| class IcebergMetadataWriter { | |
| // In a real world usecase, you can create this dynamically from the data. | |
| val icebergTable: Table = icebergCatalog.loadTable(TableIdentifier.of(Namespace.of(NAMESPACE), TABLE_NAME)) | |
| val appendFiles: AppendFiles = icebergTable.newAppend() | |
| // Lambda handler | |
| fun handle(sqsEvent: SQSEvent) { | |
| for (record in sqsEvent.records) { | |
| processRecord(record, tableObjs) | |
| } |
| fun processRecord(sqsMessage: SQSMessage): Unit { | |
| val record = S3EventNotification.parseJson(sqsMessage.body).records[0] | |
| val s3Bucket = record.s3.bucket.name | |
| val s3Object = record.s3.`object` | |
| val s3ObjectKey = s3Object.key | |
| val s3ObjectSize = s3Object.sizeAsLong | |
| val s3Path = "s3://$s3Bucket/$s3ObjectKey" | |
| if (checkDuplicate(s3Object.sequencer)) { | |
| println("Found duplicate SQS message for key: ${s3ObjectKey}. Skipping...") |
| fun readParquetMetrics(s3Path: String, table: Table): Metrics { | |
| return ParquetUtil.fileMetrics(fileIO.newInputFile(s3Path), MetricsConfig.forTable(table)) | |
| } |
| fun checkDuplicate(sequencer: String): Boolean { | |
| // TTL to expire old DynamoDB items | |
| val expireTime = ((System.currentTimeMillis() / 1000L) + DDB_ITEM_EXPIRE_SECONDS).toString() | |
| val attrs = mapOf( | |
| "sequencer" to AttributeValue(sequencer), | |
| "ttl" to AttributeValue().apply { this.setN(expireTime) } | |
| ) | |
| val req = PutItemRequest(DUPLICATES_DDB_TABLE_NAME, attrs) | |
| .apply { this.conditionExpression = "attribute_not_exists(sequencer)" } |
| import com.amazonaws.services.glue.GlueContext | |
| import com.amazonaws.services.glue.util.GlueArgParser | |
| import com.amazonaws.services.glue.util.Job | |
| import org.apache.spark.SparkConf | |
| import org.apache.spark.SparkContext | |
| import org.apache.spark.sql.Dataset | |
| import org.apache.spark.sql.Row | |
| import org.apache.spark.sql.SaveMode | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.sql.functions.from_json |
| /*- | |
| * #%L | |
| * athena-example | |
| * %% | |
| * Copyright (C) 2019 Amazon Web Services | |
| * %% | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * |
| /*- | |
| * #%L | |
| * athena-example | |
| * %% | |
| * Copyright (C) 2019 Amazon Web Services | |
| * %% | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * |
| operator: | |
| global: | |
| rbac: | |
| create: true | |
| serviceAccount: | |
| name: "starrocks" | |
| # Optional annotations to add to serviceaccount manifest | |
| annotations: {} |