Skip to content

Instantly share code, notes, and snippets.

@Samrose-Ahmed
Created December 5, 2023 01:31
Show Gist options
  • Save Samrose-Ahmed/0ebb676c348d110527fb98abaf455414 to your computer and use it in GitHub Desktop.
Save Samrose-Ahmed/0ebb676c348d110527fb98abaf455414 to your computer and use it in GitHub Desktop.
athenasource1
/*-
* #%L
* athena-example
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.matano.iceberg.datasize
import com.amazonaws.athena.connector.lambda.QueryStatusChecker
import com.amazonaws.athena.connector.lambda.data.BlockAllocator
import com.amazonaws.athena.connector.lambda.data.BlockWriter
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder
import com.amazonaws.athena.connector.lambda.domain.Split
import com.amazonaws.athena.connector.lambda.domain.TableName
import com.amazonaws.athena.connector.lambda.handlers.MetadataHandler
import com.amazonaws.athena.connector.lambda.metadata.*
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory
import com.amazonaws.services.athena.AmazonAthena
import com.amazonaws.services.secretsmanager.AWSSecretsManager
import com.matano.iceberg.IcebergMetadataWriter.Companion.createIcebergCatalog
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.slf4j.LoggerFactory
import java.time.ZoneId
import kotlin.math.max
import kotlin.math.min
//DO NOT REMOVE - this will not be _unused_ when customers go through the tutorial and uncomment
//the TODOs
/**
* This class is part of an tutorial that will walk you through how to build a connector for your
* custom data source. The README for this module (athena-example) will guide you through preparing
* your development environment, modifying this example Metadatahandler, building, deploying, and then
* using your new source in an Athena query.
*
*
* More specifically, this class is responsible for providing Athena with metadata about the schemas (aka databases),
* tables, and table partitions that your source contains. Lastly, this class tells Athena how to split up reads against
* this source. This gives you control over the level of performance and parallelism your source can support.
*
*
* For more examples, please see the other connectors in this repository (e.g. athena-cloudwatch, athena-docdb, etc...)
*/
class DatasizeMetadataHandler : MetadataHandler {
constructor(configOptions: Map<String?, String?>?) : super(SOURCE_TYPE, configOptions)
val icebergCatalog: Catalog by lazy {
createIcebergCatalog()
}
protected constructor(
keyFactory: EncryptionKeyFactory?,
awsSecretsManager: AWSSecretsManager?,
athena: AmazonAthena?,
spillBucket: String?,
spillPrefix: String?,
configOptions: Map<String?, String?>?) : super(keyFactory, awsSecretsManager, athena, SOURCE_TYPE, spillBucket, spillPrefix, configOptions)
/**
* Used to get the list of schemas (aka databases) that this source contains.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog they are querying.
* @return A ListSchemasResponse which primarily contains a Set<String> of schema names and a catalog name
* corresponding the Athena catalog that was queried.
</String> */
override fun doListSchemaNames(allocator: BlockAllocator, request: ListSchemasRequest): ListSchemasResponse {
logger.info("doListSchemaNames: enter - $request")
val schemas: Set<String> = hashSetOf("matano_db")
return ListSchemasResponse(request.catalogName, schemas)
}
/**
* Used to get a paginated list of tables that this source contains.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog and database they are querying.
* @return A ListTablesResponse which primarily contains a List<TableName> enumerating the tables in this
* catalog, database tuple. It also contains the catalog name corresponding the Athena catalog that was queried.
* @implNote A complete (un-paginated) list of tables should be returned if the request's pageSize is set to
* ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE.
</TableName> */
override fun doListTables(allocator: BlockAllocator, request: ListTablesRequest): ListTablesResponse {
logger.info("doListTables: enter - $request")
val namespace = Namespace.of("matano")
val tables = icebergCatalog.listTables(namespace).map { ti -> TableName("matano_db", ti.name()) }.toMutableList()
return ListTablesResponse(request.catalogName, tables, null)
}
/**
* Used to get definition (field names, types, descriptions, etc...) of a Table.
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details on who made the request and which Athena catalog, database, and table they are querying.
* @return A GetTableResponse which primarily contains:
* 1. An Apache Arrow Schema object describing the table's columns, types, and descriptions.
* 2. A Set<String> of partition column names (or empty if the table isn't partitioned).
* 3. A TableName object confirming the schema and table name the response is for.
* 4. A catalog name corresponding the Athena catalog that was queried.
</String> */
override fun doGetTable(allocator: BlockAllocator, request: GetTableRequest): GetTableResponse {
logger.info("doGetTable: enter - $request")
val partitionColNames: Set<String> = hashSetOf("ts_hour")
val tableSchemaBuilder = SchemaBuilder.newBuilder()
// tableSchemaBuilder.addStringField("ts_hour")
tableSchemaBuilder.addDateMilliField("ts_hour")
tableSchemaBuilder.addStringField("file_path")
tableSchemaBuilder.addBigIntField("size")
return GetTableResponse(request.catalogName,
request.tableName,
tableSchemaBuilder.build(),
partitionColNames)
}
/**
* Used to get the partitions that must be read from the request table in order to satisfy the requested predicate.
*
* @param blockWriter Used to write rows (partitions) into the Apache Arrow response.
* @param request Provides details of the catalog, database, and table being queried as well as any filter predicate.
* @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated
* @note Partitions are partially opaque to Amazon Athena in that it only understands your partition columns and
* how to filter out partitions that do not meet the query's constraints. Any additional columns you add to the
* partition data are ignored by Athena but passed on to calls on GetSplits.
*/
@Throws(Exception::class)
override fun getPartitions(blockWriter: BlockWriter, request: GetTableLayoutRequest, queryStatusChecker: QueryStatusChecker) {
logger.info("getPartitions: enter - $request")
if (!queryStatusChecker.isQueryRunning) {
logger.info("GetPartitions: Query was already finished")
return
}
val currentTsHour = System.currentTimeMillis() / 1000 / 60 / 60
val startTsHour = currentTsHour - 24 * 365 * 4
for (tsHour in startTsHour..currentTsHour) {
val tsHourMillis = tsHour * 60 * 60 * 1000L
blockWriter.writeRows { block, rowNum ->
var matched = true
matched = matched and block.setValue("ts_hour", rowNum, tsHourMillis)
return@writeRows if (matched) 1 else 0
}
}
}
/**
* Used to split-up the reads required to scan the requested batch of partition(s).
*
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details of the catalog, database, table, andpartition(s) being queried as well as
* any filter predicate.
* @return A GetSplitsResponse which primarily contains:
* 1. A Set<Split> which represent read operations Amazon Athena must perform by calling your read function.
* 2. (Optional) A continuation token which allows you to paginate the generation of splits for large queries.
* @note A Split is a mostly opaque object to Amazon Athena. Amazon Athena will use the optional SpillLocation and
* optional EncryptionKey for pipelined reads but all properties you set on the Split are passed to your read
* function to help you perform the read.
</Split> */
override fun doGetSplits(allocator: BlockAllocator, request: GetSplitsRequest): GetSplitsResponse {
logger.info("doGetSplits: enter - $request")
val catalogName = request.catalogName
val partitions = request.partitions
val tsHour = partitions.getFieldReader("ts_hour")
val splits = java.util.HashSet<Split>()
var minTsHour: Long = 0
var maxTsHour: Long = 0
for (i in 0 until partitions.rowCount) {
//Set the readers to the partition row we area on
tsHour.position = i
val unixHours = tsHour.readLocalDateTime().atZone(ZoneId.of("UTC")).toEpochSecond() / 60 / 60
minTsHour = min(minTsHour, unixHours)
maxTsHour = max(maxTsHour, unixHours)
}
logger.info("doGetSplits: minTsHour=$minTsHour, maxTsHour=$maxTsHour")
val tableId = TableIdentifier.of(Namespace.of("matano"), request.tableName.tableName)
logger.info("doGetSplits: tableId=$tableId")
val table = icebergCatalog.loadTable(tableId) ?: throw RuntimeException("Table not found: $tableId")
logger.info("doGetSplits: table=$table")
val filePaths = mutableListOf<String>()
table.newScan().filter(
Expressions.and(
Expressions.greaterThanOrEqual(Expressions.hour("ts"), minTsHour),
Expressions.lessThanOrEqual(Expressions.hour("ts"), maxTsHour)
)
).ignoreResiduals().planFiles().use { s ->
s.forEach { f ->
// todo: inject ts_hour
filePaths.add(f.file().path().toString()) }
}
// chunk into 50 files per split
val filesChunks = filePaths.chunked(50)
for (filesChunk in filesChunks) {
val split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).add("files", filesChunk.joinToString(",")).build()
splits.add(split)
}
/**
* TODO: For each partition in the request, create 1 or more splits. Splits
* are parallelizable units of work. Each represents a part of your table
* that needs to be read for the query. Splits are opaque to Athena aside from the
* spill location and encryption key. All properties added to a split are solely
* for your use when Athena calls your readWithContraints(...) function to perform
* the read. In this example we just need to know the partition details (year, month, day).
*
* Split split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey())
* .add("year", String.valueOf(year.readInteger()))
* .add("month", String.valueOf(month.readInteger()))
* .add("day", String.valueOf(day.readInteger()))
* .build();
*
* splits.add(split);
*
*/
logger.info("doGetSplits: exit - " + splits.size)
return GetSplitsResponse(catalogName, splits)
}
/**
* Used to describe the types of capabilities supported by a data source. An engine can use this to determine what
* portions of the query to push down. A connector that returns any optimization will guarantee that the associated
* predicate will be pushed down.
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details about the catalog being used.
* @return A GetDataSourceCapabilitiesResponse object which returns a map of supported optimizations that
* the connector is advertising to the consumer. The connector assumes all responsibility for whatever is passed here.
*/
override fun doGetDataSourceCapabilities(allocator: BlockAllocator, request: GetDataSourceCapabilitiesRequest): GetDataSourceCapabilitiesResponse {
var capabilities: Map<String, List<OptimizationSubType>> = mutableMapOf()
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(*FilterPushdownSubType.values()).toPair())
capabilities = capabilities.plus(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(*LimitPushdownSubType.values()).toPair())
/*
* TODO: Add capabilities which your connector will support.
* The capabilities you return here will cause Athena to fill the Constraints object in the RecordHandler's
* readWithConstraint method's ReadRecordsRequest parameter with the specific pushdowns your connector supports
* and expect your record handler to correctly evaluate the pushdowns present.
*
* Example: capabilities.putAll(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(FilterPushdownSubType.ALL));
* This tells Athena your connector can handle simple associative filters, like colA > 10.
*
* See athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/DataSourceOptimizations.java for options.
* Pushdown subtypes are found in athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/metadata/optimizations/pushdown/
*
*/return GetDataSourceCapabilitiesResponse(request.catalogName, capabilities)
}
val logger = LoggerFactory.getLogger(this::class.java)
companion object {
/**
* used to aid in debugging. Athena will use this name in conjunction with your catalog id
* to correlate relevant query errors.
*/
private const val SOURCE_TYPE = "matano_datasize"
}
}
package com.matano.iceberg.datasize
import com.amazonaws.athena.connector.lambda.QueryStatusChecker
import com.amazonaws.athena.connector.lambda.data.Block
import com.amazonaws.athena.connector.lambda.data.BlockAllocator
import com.amazonaws.athena.connector.lambda.data.BlockSpiller
import com.amazonaws.athena.connector.lambda.data.S3BlockSpiller
import com.amazonaws.athena.connector.lambda.data.writers.GeneratedRowWriter
import com.amazonaws.athena.connector.lambda.data.writers.extractors.BigIntExtractor
import com.amazonaws.athena.connector.lambda.data.writers.extractors.DateMilliExtractor
import com.amazonaws.athena.connector.lambda.data.writers.extractors.VarCharExtractor
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest
import com.amazonaws.services.athena.AmazonAthena
import com.amazonaws.services.athena.AmazonAthenaClientBuilder
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.secretsmanager.AWSSecretsManager
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder
import com.matano.iceberg.IcebergMetadataWriter
import com.matano.iceberg.parseS3Uri
import org.apache.iceberg.catalog.Catalog
import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.slf4j.LoggerFactory
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode
import software.amazon.awssdk.core.async.AsyncResponseTransformer
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup
import software.amazon.awssdk.services.s3.S3AsyncClient
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStreamReader
import java.time.Duration
import java.util.concurrent.CompletableFuture
/**
* This class is part of an tutorial that will walk you through how to build a connector for your
* custom data source. The README for this module (athena-example) will guide you through preparing
* your development environment, modifying this example RecordHandler, building, deploying, and then
* using your new source in an Athena query.
*
*
* More specifically, this class is responsible for providing Athena with actual rows level data from your source. Athena
* will call readWithConstraint(...) on this class for each 'Split' you generated in ExampleMetadataHandler.
*
*
* For more examples, please see the other connectors in this repository (e.g. athena-cloudwatch, athena-docdb, etc...)
*/
class DatasizeRecordHandler protected constructor(private val amazonS3: AmazonS3, secretsManager: AWSSecretsManager?, amazonAthena: AmazonAthena?, configOptions: Map<String?, String?>?) : RecordHandler(amazonS3, secretsManager, amazonAthena, SOURCE_TYPE, configOptions) {
constructor(configOptions: Map<String?, String?>?) : this(AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(), configOptions)
val icebergCatalog: Catalog by lazy {
IcebergMetadataWriter.createIcebergCatalog()
}
/**
* Used to read the row data associated with the provided Split.
*
* @param spiller A BlockSpiller that should be used to write the row data associated with this Split.
* The BlockSpiller automatically handles chunking the response, encrypting, and spilling to S3.
* @param recordsRequest Details of the read request, including:
* 1. The Split
* 2. The Catalog, Database, and Table the read request is for.
* 3. The filtering predicate (if any)
* 4. The columns required for projection.
* @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated
* @throws IOException
* @note Avoid writing >10 rows per-call to BlockSpiller.writeRow(...) because this will limit the BlockSpiller's
* ability to control Block size. The resulting increase in Block size may cause failures and reduced performance.
*/
@Throws(IOException::class)
override fun readWithConstraint(spiller: BlockSpiller, recordsRequest: ReadRecordsRequest, queryStatusChecker: QueryStatusChecker?) {
logger.info("readWithConstraint: enter - " + recordsRequest.split)
if (queryStatusChecker?.isQueryRunning == false) {
logger.info("readWithConstraint: Query was already finished")
return
}
val split = recordsRequest.split
val filesStr = split.getProperty("files")
val filePaths = filesStr.split(",").toList()
val sizeFuts = filePaths.map { path ->
val (bucket, key) = parseS3Uri(path)
readParquetMetrics(bucket, key).thenApplyAsync { arrayOf(path, parseSizeFromParquetMetadata(it)) }
}
CompletableFuture.allOf(*sizeFuts.toTypedArray()).join()
val sizes = sizeFuts.map { it.get() }.toTypedArray()
val builder = GeneratedRowWriter.newBuilder(recordsRequest.constraints)
builder.withExtractor("file_path", VarCharExtractor { context, value ->
val p = (context as Array<*>)[0] as String
value!!.isSet = 1
value.value = p
})
builder.withExtractor("size", BigIntExtractor { context, value ->
val size = (context as Array<*>)[1] as Long
value.isSet = 1
value.value = size
})
builder.withExtractor("ts_hour", DateMilliExtractor { context, value ->
// val ts = (context as Array<*>)[2] as Long
value.isSet = 0
// value.value = ts
})
//Used some basic code-gen to optimize how we generate response data.
val rowWriter = builder.build()
for (sizeRow in sizes) {
spiller.writeRows { block: Block?, rowNum: Int -> if (rowWriter.writeRow(block, rowNum, sizeRow)) 1 else 0 }
}
}
fun parseSizeFromParquetMetadata(metadata: ParquetMetadata): Long {
return metadata.blocks.sumOf { b ->
b.columns.find { c ->
c.path.toDotString() == "event.original"
}?.totalUncompressedSize ?: 0
}
}
fun readParquetMetrics(s3Bucket: String, s3Key: String): CompletableFuture<ParquetMetadata> {
val start = System.currentTimeMillis()
val ret = IcebergMetadataWriter.s3Client.getObject({ r ->
r.bucket(s3Bucket).key(s3Key).range("bytes=-8")
}, AsyncResponseTransformer.toBytes()).thenComposeAsync { r ->
val footerLength = BytesUtils.readIntLittleEndian(r.asInputStream())
IcebergMetadataWriter.s3Client.getObject({ r ->
r.bucket(s3Bucket).key(s3Key).range("bytes=-${footerLength + 8}")
}, AsyncResponseTransformer.toBytes())
.thenApplyAsync { r ->
val metadata = IcebergMetadataWriter.parquetMetadataConverter.readParquetMetadata(r.asInputStream(), ParquetMetadataConverter.NO_FILTER, null, false, footerLength)
logger.debug("Read parquet footer for s3://$s3Bucket/$s3Key in: ${System.currentTimeMillis() - start} ms")
metadata
}
}
return ret
}
private val logger = LoggerFactory.getLogger(this::class.java)
companion object {
/**
* used to aid in debugging. Athena will use this name in conjunction with your catalog id
* to correlate relevant query errors.
*/
private const val SOURCE_TYPE = "example"
val s3Client = S3AsyncClient
.builder()
.httpClientBuilder(
NettyNioAsyncHttpClient.builder()
.maxConcurrency(500)
.maxPendingConnectionAcquires(50000)
.connectionAcquisitionTimeout(Duration.ofSeconds(60))
.eventLoopGroupBuilder(
SdkEventLoopGroup.builder().numberOfThreads(10),
)
.tcpKeepAlive(true),
)
.defaultsMode(DefaultsMode.IN_REGION)
.build()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment