Last active
February 23, 2018 08:34
-
-
Save breath103/7c70fafe3a106d21d1b25e91ede707ba to your computer and use it in GitHub Desktop.
CloudFront logs athena table partition indexer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as AWS from "aws-sdk"; | |
// Athena Helper | |
export async function executeAthenaQuery(query: string | string[], outputLocation: string) { | |
const athena = new AWS.Athena(); | |
let queryString: string; | |
if (typeof query === "string") { | |
queryString = query; | |
} else { | |
queryString = query.join("\n"); | |
} | |
const queryExecution = await athena.startQueryExecution({ | |
QueryString: queryString, | |
ResultConfiguration: { OutputLocation: outputLocation }, | |
}).promise(); | |
while (true) { | |
const latestExecution = | |
await athena.getQueryExecution({ QueryExecutionId: queryExecution.QueryExecutionId! }).promise(); | |
await new Promise((resolve, reject) => { | |
setTimeout(resolve, 1000); | |
}); | |
switch (latestExecution.QueryExecution!.Status!.State) { | |
case "FAILED": { | |
throw new Error(`Execution failed. ${latestExecution.QueryExecution!.QueryExecutionId}`); | |
} | |
case "CANCELLED": { | |
throw new Error(`Execution cancelled. ${latestExecution.QueryExecution!.QueryExecutionId}`); | |
} | |
case "QUEUED": | |
case "RUNNING": break; | |
case "SUCCEEDED": { | |
return latestExecution; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as AWS from "aws-sdk"; | |
import HandlerWrapper from "../handler_wrapper"; | |
import { executeAthenaQuery } from "./athena"; | |
import { S3Event } from "./s3_event"; | |
import * as debug from "debug"; | |
const log = debug("HANDLER"); | |
export async function __handler(event: S3Event) { | |
log(`Received Total: ${event.Records.length} records`); | |
try { | |
const logS3Bucket: string = (() => { | |
const raw = process.env.CLOUDFRONT_LOG_S3_BUCKET; | |
if (typeof raw !== "string") { | |
throw new Error(`CLOUDFRONT_LOG_S3_BUCKET must be provided`); | |
} | |
return raw; | |
})(); | |
const athenaOutputLocation = `s3://${logS3Bucket}/athena/`; | |
const s3RootLocation = `s3://${logS3Bucket}/log/`; | |
const tableName = "cloudfront_logs"; | |
const resetTable = false; | |
if (resetTable) { | |
// tslint:disable | |
// Drop Athena Table | |
await executeAthenaQuery([`DROP TABLE IF EXISTS ${tableName}`], athenaOutputLocation); | |
// Create Table | |
await executeAthenaQuery( | |
`CREATE EXTERNAL TABLE ${tableName}( | |
date date COMMENT '', | |
time string COMMENT '', | |
location string COMMENT '', | |
bytes bigint COMMENT '', | |
requestip string COMMENT '', | |
method string COMMENT '', | |
host string COMMENT '', | |
uri string COMMENT '', | |
status int COMMENT '', | |
referrer string COMMENT '', | |
useragent string COMMENT '', | |
querystring string COMMENT '', | |
cookie string COMMENT '', | |
resulttype string COMMENT '', | |
requestid string COMMENT '', | |
hostheader string COMMENT '', | |
requestprotocol int COMMENT '', | |
requestbytes bigint COMMENT '', | |
timetaken double COMMENT '', | |
xforwardedfor string COMMENT '', | |
sslprotocol string COMMENT '', | |
sslcipher string COMMENT '', | |
responseresulttype string COMMENT '', | |
protocol_version string COMMENT '', | |
fle_status string COMMENT '', | |
fle_encrypted_fields string COMMENT '' | |
) | |
PARTITIONED BY ( | |
distributionId string, year string, month string, day string, hour string | |
) | |
ROW FORMAT SERDE | |
'org.apache.hadoop.hive.serde2.RegexSerDe' | |
WITH SERDEPROPERTIES ( | |
'input.regex'='^(?!#)([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)\\s+([^ \\t]+)$') | |
STORED AS INPUTFORMAT | |
'org.apache.hadoop.mapred.TextInputFormat' | |
OUTPUTFORMAT | |
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | |
LOCATION | |
'${s3RootLocation}' | |
`, athenaOutputLocation); | |
// tslint:enable | |
} | |
const s3 = new AWS.S3(); | |
for (const record of event.Records) { | |
const key = record.s3.object.key; | |
const pathComponents = key.split("/"); | |
const [distributionId, date, hash] = pathComponents[pathComponents.length - 1].split("."); | |
const [year, month, day, hour] = date.split("-"); | |
log(`Processing ${record.s3.bucket.name}/${record.s3.object.key}`); | |
// First, we've got to move file into new folder.. | |
const newKeyFolder = `logs/${distributionId}/${year}/${month}/${day}/${hour}/`; | |
const newKey = `${newKeyFolder}${hash}.gz`; | |
await s3.copyObject({ | |
Bucket: record.s3.bucket.name, | |
Key: newKey, | |
CopySource: `/${record.s3.bucket.name}/${record.s3.object.key}`, | |
}).promise(); | |
// And register it | |
const addPartitionResult = await executeAthenaQuery([ | |
`ALTER TABLE ${tableName}`, | |
// tslint:disable-next-line | |
`ADD IF NOT EXISTS PARTITION (distributionId="${distributionId}", year="${year}", month="${month}", day="${day}", hour="${hour}")`, | |
`LOCATION 's3://${record.s3.bucket.name}/${newKeyFolder}'`, | |
], athenaOutputLocation); | |
} | |
} catch (e) { | |
log("Error : ", e); | |
throw e; | |
} | |
} | |
export const handler = HandlerWrapper.safelyWrap(__handler); | |
// tslint:disable |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface S3Event { | |
Records: S3EventRecord[]; | |
} | |
export interface S3EventRecord { | |
eventVersion: string; | |
eventSource: string; | |
awsRegion: string; | |
eventTime: string; // 2017-04-11T11:29:00.611Z; | |
eventName: string; | |
userIdentity: { | |
principalId: string; | |
}; | |
requestParameters: { | |
sourceIPAddress: string; | |
}; | |
responseElements: { | |
["x-amz-request-id"]: string; | |
["x-amz-id-2"]: string; | |
}; | |
s3: { | |
s3SchemaVersion: string; | |
configurationId: string; | |
bucket: { | |
name: string; | |
ownerIdentity: { | |
principalId: string; | |
}; | |
arn: string; | |
}; | |
object: { | |
key: string; | |
size: number; | |
eTag: string; | |
sequencer: string; | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment