Created
December 5, 2023 01:31
-
-
Save Samrose-Ahmed/0ebb676c348d110527fb98abaf455414 to your computer and use it in GitHub Desktop.
athenasource1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*- | |
* #%L | |
* athena-example | |
* %% | |
* Copyright (C) 2019 Amazon Web Services | |
* %% | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* #L% | |
*/ | |
package com.matano.iceberg.datasize | |
import com.amazonaws.athena.connector.lambda.QueryStatusChecker | |
import com.amazonaws.athena.connector.lambda.data.BlockAllocator | |
import com.amazonaws.athena.connector.lambda.data.BlockWriter | |
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder | |
import com.amazonaws.athena.connector.lambda.domain.Split | |
import com.amazonaws.athena.connector.lambda.domain.TableName | |
import com.amazonaws.athena.connector.lambda.handlers.MetadataHandler | |
import com.amazonaws.athena.connector.lambda.metadata.* | |
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations | |
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType | |
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType | |
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType | |
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory | |
import com.amazonaws.services.athena.AmazonAthena | |
import com.amazonaws.services.secretsmanager.AWSSecretsManager | |
import com.matano.iceberg.IcebergMetadataWriter.Companion.createIcebergCatalog | |
import org.apache.iceberg.catalog.Catalog | |
import org.apache.iceberg.catalog.Namespace | |
import org.apache.iceberg.catalog.TableIdentifier | |
import org.apache.iceberg.expressions.Expressions | |
import org.slf4j.LoggerFactory | |
import java.time.ZoneId | |
import kotlin.math.max | |
import kotlin.math.min | |
//DO NOT REMOVE - this will not be _unused_ when customers go through the tutorial and uncomment | |
//the TODOs | |
/** | |
* This class is part of an tutorial that will walk you through how to build a connector for your | |
* custom data source. The README for this module (athena-example) will guide you through preparing | |
* your development environment, modifying this example Metadatahandler, building, deploying, and then | |
* using your new source in an Athena query. | |
* | |
* | |
* More specifically, this class is responsible for providing Athena with metadata about the schemas (aka databases), | |
* tables, and table partitions that your source contains. Lastly, this class tells Athena how to split up reads against | |
* this source. This gives you control over the level of performance and parallelism your source can support. | |
* | |
* | |
* For more examples, please see the other connectors in this repository (e.g. athena-cloudwatch, athena-docdb, etc...) | |
*/ | |
class DatasizeMetadataHandler : MetadataHandler { | |
constructor(configOptions: Map<String?, String?>?) : super(SOURCE_TYPE, configOptions) | |
val icebergCatalog: Catalog by lazy { | |
createIcebergCatalog() | |
} | |
protected constructor( | |
keyFactory: EncryptionKeyFactory?, | |
awsSecretsManager: AWSSecretsManager?, | |
athena: AmazonAthena?, | |
spillBucket: String?, | |
spillPrefix: String?, | |
configOptions: Map<String?, String?>?) : super(keyFactory, awsSecretsManager, athena, SOURCE_TYPE, spillBucket, spillPrefix, configOptions) | |
/** | |
* Used to get the list of schemas (aka databases) that this source contains. | |
* | |
* @param allocator Tool for creating and managing Apache Arrow Blocks. | |
* @param request Provides details on who made the request and which Athena catalog they are querying. | |
* @return A ListSchemasResponse which primarily contains a Set<String> of schema names and a catalog name | |
* corresponding the Athena catalog that was queried. | |
</String> */ | |
override fun doListSchemaNames(allocator: BlockAllocator, request: ListSchemasRequest): ListSchemasResponse { | |
logger.info("doListSchemaNames: enter - $request") | |
val schemas: Set<String> = hashSetOf("matano_db") | |
return ListSchemasResponse(request.catalogName, schemas) | |
} | |
/** | |
* Used to get a paginated list of tables that this source contains. | |
* | |
* @param allocator Tool for creating and managing Apache Arrow Blocks. | |
* @param request Provides details on who made the request and which Athena catalog and database they are querying. | |
* @return A ListTablesResponse which primarily contains a List<TableName> enumerating the tables in this | |
* catalog, database tuple. It also contains the catalog name corresponding the Athena catalog that was queried. | |
* @implNote A complete (un-paginated) list of tables should be returned if the request's pageSize is set to | |
* ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE. | |
</TableName> */ | |
override fun doListTables(allocator: BlockAllocator, request: ListTablesRequest): ListTablesResponse { | |
logger.info("doListTables: enter - $request") | |
val namespace = Namespace.of("matano") | |
val tables = icebergCatalog.listTables(namespace).map { ti -> TableName("matano_db", ti.name()) }.toMutableList() | |
return ListTablesResponse(request.catalogName, tables, null) | |
} | |
/** | |
* Used to get definition (field names, types, descriptions, etc...) of a Table. | |
* | |
* @param allocator Tool for creating and managing Apache Arrow Blocks. | |
* @param request Provides details on who made the request and which Athena catalog, database, and table they are querying. | |
* @return A GetTableResponse which primarily contains: | |
* 1. An Apache Arrow Schema object describing the table's columns, types, and descriptions. | |
* 2. A Set<String> of partition column names (or empty if the table isn't partitioned). | |
* 3. A TableName object confirming the schema and table name the response is for. | |
* 4. A catalog name corresponding the Athena catalog that was queried. | |
</String> */ | |
override fun doGetTable(allocator: BlockAllocator, request: GetTableRequest): GetTableResponse { | |
logger.info("doGetTable: enter - $request") | |
val partitionColNames: Set<String> = hashSetOf("ts_hour") | |
val tableSchemaBuilder = SchemaBuilder.newBuilder() | |
// tableSchemaBuilder.addStringField("ts_hour") | |
tableSchemaBuilder.addDateMilliField("ts_hour") | |
tableSchemaBuilder.addStringField("file_path") | |
tableSchemaBuilder.addBigIntField("size") | |
return GetTableResponse(request.catalogName, | |
request.tableName, | |
tableSchemaBuilder.build(), | |
partitionColNames) | |
} | |
/** | |
* Used to get the partitions that must be read from the request table in order to satisfy the requested predicate. | |
* | |
* @param blockWriter Used to write rows (partitions) into the Apache Arrow response. | |
* @param request Provides details of the catalog, database, and table being queried as well as any filter predicate. | |
* @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated | |
* @note Partitions are partially opaque to Amazon Athena in that it only understands your partition columns and | |
* how to filter out partitions that do not meet the query's constraints. Any additional columns you add to the | |
* partition data are ignored by Athena but passed on to calls on GetSplits. | |
*/ | |
@Throws(Exception::class) | |
override fun getPartitions(blockWriter: BlockWriter, request: GetTableLayoutRequest, queryStatusChecker: QueryStatusChecker) { | |
logger.info("getPartitions: enter - $request") | |
if (!queryStatusChecker.isQueryRunning) { | |
logger.info("GetPartitions: Query was already finished") | |
return | |
} | |
val currentTsHour = System.currentTimeMillis() / 1000 / 60 / 60 | |
val startTsHour = currentTsHour - 24 * 365 * 4 | |
for (tsHour in startTsHour..currentTsHour) { | |
val tsHourMillis = tsHour * 60 * 60 * 1000L | |
blockWriter.writeRows { block, rowNum -> | |
var matched = true | |
matched = matched and block.setValue("ts_hour", rowNum, tsHourMillis) | |
return@writeRows if (matched) 1 else 0 | |
} | |
} | |
} | |
/** | |
* Used to split-up the reads required to scan the requested batch of partition(s). | |
* | |
* @param allocator Tool for creating and managing Apache Arrow Blocks. | |
* @param request Provides details of the catalog, database, table, andpartition(s) being queried as well as | |
* any filter predicate. | |
* @return A GetSplitsResponse which primarily contains: | |
* 1. A Set<Split> which represent read operations Amazon Athena must perform by calling your read function. | |
* 2. (Optional) A continuation token which allows you to paginate the generation of splits for large queries. | |
* @note A Split is a mostly opaque object to Amazon Athena. Amazon Athena will use the optional SpillLocation and | |
* optional EncryptionKey for pipelined reads but all properties you set on the Split are passed to your read | |
* function to help you perform the read. | |
</Split> */ | |
override fun doGetSplits(allocator: BlockAllocator, request: GetSplitsRequest): GetSplitsResponse { | |
logger.info("doGetSplits: enter - $request") | |
val catalogName = request.catalogName | |
val partitions = request.partitions | |
val tsHour = partitions.getFieldReader("ts_hour") | |
val splits = java.util.HashSet<Split>() | |
var minTsHour: Long = 0 | |
var maxTsHour: Long = 0 | |
for (i in 0 until partitions.rowCount) { | |
//Set the readers to the partition row we area on | |
tsHour.position = i | |
val unixHours = tsHour.readLocalDateTime().atZone(ZoneId.of("UTC")).toEpochSecond() / 60 / 60 | |
minTsHour = min(minTsHour, unixHours) | |
maxTsHour = max(maxTsHour, unixHours) | |
} | |
logger.info("doGetSplits: minTsHour=$minTsHour, maxTsHour=$maxTsHour") | |
val tableId = TableIdentifier.of(Namespace.of("matano"), request.tableName.tableName) | |
logger.info("doGetSplits: tableId=$tableId") | |
val table = icebergCatalog.loadTable(tableId) ?: throw RuntimeException("Table not found: $tableId") | |
logger.info("doGetSplits: table=$table") | |
val filePaths = mutableListOf<String>() | |
table.newScan().filter( | |
Expressions.and( | |
Expressions.greaterThanOrEqual(Expressions.hour("ts"), minTsHour), | |
Expressions.lessThanOrEqual(Expressions.hour("ts"), maxTsHour) | |
) | |
).ignoreResiduals().planFiles().use { s -> | |
s.forEach { f -> | |
// todo: inject ts_hour | |
filePaths.add(f.file().path().toString()) } | |
} | |
// chunk into 50 files per split | |
val filesChunks = filePaths.chunked(50) | |
for (filesChunk in filesChunks) { | |
val split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).add("files", filesChunk.joinToString(",")).build() | |
splits.add(split) | |
} | |
/** | |
* TODO: For each partition in the request, create 1 or more splits. Splits | |
* are parallelizable units of work. Each represents a part of your table | |
* that needs to be read for the query. Splits are opaque to Athena aside from the | |
* spill location and encryption key. All properties added to a split are solely | |
* for your use when Athena calls your readWithContraints(...) function to perform | |
* the read. In this example we just need to know the partition details (year, month, day). | |
* | |
* Split split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()) | |
* .add("year", String.valueOf(year.readInteger())) | |
* .add("month", String.valueOf(month.readInteger())) | |
* .add("day", String.valueOf(day.readInteger())) | |
* .build(); | |
* | |
* splits.add(split); | |
* | |
*/ | |
logger.info("doGetSplits: exit - " + splits.size) | |
return GetSplitsResponse(catalogName, splits) | |
} | |
/** | |
* Used to describe the types of capabilities supported by a data source. An engine can use this to determine what | |
* portions of the query to push down. A connector that returns any optimization will guarantee that the associated | |
* predicate will be pushed down. | |
* @param allocator Tool for creating and managing Apache Arrow Blocks. | |
* @param request Provides details about the catalog being used. | |
* @return A GetDataSourceCapabilitiesResponse object which returns a map of supported optimizations that | |
* the connector is advertising to the consumer. The connector assumes all responsibility for whatever is passed here. | |
*/ | |
override fun doGetDataSourceCapabilities(allocator: BlockAllocator, request: GetDataSourceCapabilitiesRequest): GetDataSourceCapabilitiesResponse { | |
var capabilities: Map<String, List<OptimizationSubType>> = mutableMapOf() | |
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(*FilterPushdownSubType.values()).toPair()) | |
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(*LimitPushdownSubType.values()).toPair()) | |
/* | |
* TODO: Add capabilities which your connector will support. | |
* The capabilities you return here will cause Athena to fill the Constraints object in the RecordHandler's | |
* readWithConstraint method's ReadRecordsRequest parameter with the specific pushdowns your connector supports | |
* and expect your record handler to correctly evaluate the pushdowns present. | |
* | |
* Example: capabilities.putAll(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(FilterPushdownSubType.ALL)); | |
* This tells Athena your connector can handle simple associative filters, like colA > 10. | |
* | |
* See athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/DataSourceOptimizations.java for options. | |
* Pushdown subtypes are found in athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/pushdown/ | |
* | |
*/return GetDataSourceCapabilitiesResponse(request.catalogName, capabilities) | |
} | |
val logger = LoggerFactory.getLogger(this::class.java) | |
companion object { | |
/** | |
* used to aid in debugging. Athena will use this name in conjunction with your catalog id | |
* to correlate relevant query errors. | |
*/ | |
private const val SOURCE_TYPE = "matano_datasize" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.matano.iceberg.datasize | |
import com.amazonaws.athena.connector.lambda.QueryStatusChecker | |
import com.amazonaws.athena.connector.lambda.data.Block | |
import com.amazonaws.athena.connector.lambda.data.BlockAllocator | |
import com.amazonaws.athena.connector.lambda.data.BlockSpiller | |
import com.amazonaws.athena.connector.lambda.data.S3BlockSpiller | |
import com.amazonaws.athena.connector.lambda.data.writers.GeneratedRowWriter | |
import com.amazonaws.athena.connector.lambda.data.writers.extractors.BigIntExtractor | |
import com.amazonaws.athena.connector.lambda.data.writers.extractors.DateMilliExtractor | |
import com.amazonaws.athena.connector.lambda.data.writers.extractors.VarCharExtractor | |
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler | |
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest | |
import com.amazonaws.services.athena.AmazonAthena | |
import com.amazonaws.services.athena.AmazonAthenaClientBuilder | |
import com.amazonaws.services.s3.AmazonS3 | |
import com.amazonaws.services.s3.AmazonS3ClientBuilder | |
import com.amazonaws.services.secretsmanager.AWSSecretsManager | |
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder | |
import com.matano.iceberg.IcebergMetadataWriter | |
import com.matano.iceberg.parseS3Uri | |
import org.apache.iceberg.catalog.Catalog | |
import org.apache.parquet.bytes.BytesUtils | |
import org.apache.parquet.format.converter.ParquetMetadataConverter | |
import org.apache.parquet.hadoop.metadata.ParquetMetadata | |
import org.slf4j.LoggerFactory | |
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode | |
import software.amazon.awssdk.core.async.AsyncResponseTransformer | |
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient | |
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup | |
import software.amazon.awssdk.services.s3.S3AsyncClient | |
import java.io.BufferedReader | |
import java.io.IOException | |
import java.io.InputStreamReader | |
import java.time.Duration | |
import java.util.concurrent.CompletableFuture | |
/** | |
* This class is part of an tutorial that will walk you through how to build a connector for your | |
* custom data source. The README for this module (athena-example) will guide you through preparing | |
* your development environment, modifying this example RecordHandler, building, deploying, and then | |
* using your new source in an Athena query. | |
* | |
* | |
* More specifically, this class is responsible for providing Athena with actual rows level data from your source. Athena | |
* will call readWithConstraint(...) on this class for each 'Split' you generated in ExampleMetadataHandler. | |
* | |
* | |
* For more examples, please see the other connectors in this repository (e.g. athena-cloudwatch, athena-docdb, etc...) | |
*/ | |
class DatasizeRecordHandler protected constructor(private val amazonS3: AmazonS3, secretsManager: AWSSecretsManager?, amazonAthena: AmazonAthena?, configOptions: Map<String?, String?>?) : RecordHandler(amazonS3, secretsManager, amazonAthena, SOURCE_TYPE, configOptions) { | |
constructor(configOptions: Map<String?, String?>?) : this(AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(), configOptions) | |
val icebergCatalog: Catalog by lazy { | |
IcebergMetadataWriter.createIcebergCatalog() | |
} | |
/** | |
* Used to read the row data associated with the provided Split. | |
* | |
* @param spiller A BlockSpiller that should be used to write the row data associated with this Split. | |
* The BlockSpiller automatically handles chunking the response, encrypting, and spilling to S3. | |
* @param recordsRequest Details of the read request, including: | |
* 1. The Split | |
* 2. The Catalog, Database, and Table the read request is for. | |
* 3. The filtering predicate (if any) | |
* 4. The columns required for projection. | |
* @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated | |
* @throws IOException | |
* @note Avoid writing >10 rows per-call to BlockSpiller.writeRow(...) because this will limit the BlockSpiller's | |
* ability to control Block size. The resulting increase in Block size may cause failures and reduced performance. | |
*/ | |
@Throws(IOException::class) | |
override fun readWithConstraint(spiller: BlockSpiller, recordsRequest: ReadRecordsRequest, queryStatusChecker: QueryStatusChecker?) { | |
logger.info("readWithConstraint: enter - " + recordsRequest.split) | |
if (queryStatusChecker?.isQueryRunning == false) { | |
logger.info("readWithConstraint: Query was already finished") | |
return | |
} | |
val split = recordsRequest.split | |
val filesStr = split.getProperty("files") | |
val filePaths = filesStr.split(",").toList() | |
val sizeFuts = filePaths.map { path -> | |
val (bucket, key) = parseS3Uri(path) | |
readParquetMetrics(bucket, key).thenApplyAsync { arrayOf(path, parseSizeFromParquetMetadata(it)) } | |
} | |
CompletableFuture.allOf(*sizeFuts.toTypedArray()).join() | |
val sizes = sizeFuts.map { it.get() }.toTypedArray() | |
val builder = GeneratedRowWriter.newBuilder(recordsRequest.constraints) | |
builder.withExtractor("file_path", VarCharExtractor { context, value -> | |
val p = (context as Array<*>)[0] as String | |
value!!.isSet = 1 | |
value.value = p | |
}) | |
builder.withExtractor("size", BigIntExtractor { context, value -> | |
val size = (context as Array<*>)[1] as Long | |
value.isSet = 1 | |
value.value = size | |
}) | |
builder.withExtractor("ts_hour", DateMilliExtractor { context, value -> | |
// val ts = (context as Array<*>)[2] as Long | |
value.isSet = 0 | |
// value.value = ts | |
}) | |
//Used some basic code-gen to optimize how we generate response data. | |
val rowWriter = builder.build() | |
for (sizeRow in sizes) { | |
spiller.writeRows { block: Block?, rowNum: Int -> if (rowWriter.writeRow(block, rowNum, sizeRow)) 1 else 0 } | |
} | |
} | |
fun parseSizeFromParquetMetadata(metadata: ParquetMetadata): Long { | |
return metadata.blocks.sumOf { b -> | |
b.columns.find { c -> | |
c.path.toDotString() == "event.original" | |
}?.totalUncompressedSize ?: 0 | |
} | |
} | |
fun readParquetMetrics(s3Bucket: String, s3Key: String): CompletableFuture<ParquetMetadata> { | |
val start = System.currentTimeMillis() | |
val ret = IcebergMetadataWriter.s3Client.getObject({ r -> | |
r.bucket(s3Bucket).key(s3Key).range("bytes=-8") | |
}, AsyncResponseTransformer.toBytes()).thenComposeAsync { r -> | |
val footerLength = BytesUtils.readIntLittleEndian(r.asInputStream()) | |
IcebergMetadataWriter.s3Client.getObject({ r -> | |
r.bucket(s3Bucket).key(s3Key).range("bytes=-${footerLength + 8}") | |
}, AsyncResponseTransformer.toBytes()) | |
.thenApplyAsync { r -> | |
val metadata = IcebergMetadataWriter.parquetMetadataConverter.readParquetMetadata(r.asInputStream(), ParquetMetadataConverter.NO_FILTER, null, false, footerLength) | |
logger.debug("Read parquet footer for s3://$s3Bucket/$s3Key in: ${System.currentTimeMillis() - start} ms") | |
metadata | |
} | |
} | |
return ret | |
} | |
private val logger = LoggerFactory.getLogger(this::class.java) | |
companion object { | |
/** | |
* used to aid in debugging. Athena will use this name in conjunction with your catalog id | |
* to correlate relevant query errors. | |
*/ | |
private const val SOURCE_TYPE = "example" | |
val s3Client = S3AsyncClient | |
.builder() | |
.httpClientBuilder( | |
NettyNioAsyncHttpClient.builder() | |
.maxConcurrency(500) | |
.maxPendingConnectionAcquires(50000) | |
.connectionAcquisitionTimeout(Duration.ofSeconds(60)) | |
.eventLoopGroupBuilder( | |
SdkEventLoopGroup.builder().numberOfThreads(10), | |
) | |
.tcpKeepAlive(true), | |
) | |
.defaultsMode(DefaultsMode.IN_REGION) | |
.build() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment