Created
May 21, 2018 15:41
-
-
Save sysboss/d40ea8a7a12f510e61d7980269323b36 to your computer and use it in GitHub Desktop.
SQL Query Amazon Athena using Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Query AWS Athena using SQL | |
# Copyright (c) Alexey Baikov <sysboss[at]mail.ru> | |
# | |
# This snippet is a basic example to query Athen and load the results | |
# to a variable. | |
# | |
# Requirements: | |
# > pip3 install boto3 botocore retrying | |
import os | |
import sys | |
import csv | |
import boto3 | |
import botocore | |
from retrying import retry | |
# configuration | |
s3_bucket = 'athenaoutput' # S3 Bucket name | |
s3_ouput = 's3://'+ s3_bucket # S3 Bucket to store results | |
database = 'datalake_database' # The database to which the query belongs | |
# init clients | |
athena = boto3.client('athena') | |
s3 = boto3.resource('s3') | |
@retry(stop_max_attempt_number = 10, | |
wait_exponential_multiplier = 300, | |
wait_exponential_max = 1 * 60 * 1000) | |
def poll_status(_id): | |
result = athena.get_query_execution( QueryExecutionId = _id ) | |
state = result['QueryExecution']['Status']['State'] | |
if state == 'SUCCEEDED': | |
return result | |
elif state == 'FAILED': | |
return result | |
else: | |
raise Exception | |
def run_query(query, database, s3_output): | |
response = athena.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': database | |
}, | |
ResultConfiguration={ | |
'OutputLocation': s3_output, | |
}) | |
QueryExecutionId = response['QueryExecutionId'] | |
result = poll_status(QueryExecutionId) | |
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED': | |
print("Query SUCCEEDED: {}".format(QueryExecutionId)) | |
s3_key = QueryExecutionId + '.csv' | |
local_filename = QueryExecutionId + '.csv' | |
# download result file | |
try: | |
s3.Bucket(s3_bucket).download_file(s3_key, local_filename) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
print("The object does not exist.") | |
else: | |
raise | |
# read file to array | |
rows = [] | |
with open(local_filename) as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
rows.append(row) | |
# delete result file | |
if os.path.isfile(local_filename): | |
os.remove(local_filename) | |
return rows | |
if __name__ == '__main__': | |
# SQL Query to execute | |
query = (""" | |
SELECT id, name | |
FROM example | |
LIMIT 20 | |
""") | |
print("Executing query: {}".format(query)) | |
result = run_query(query, database, s3_ouput) | |
print("Results:") | |
print(result) |
Hello this code is giving me following error. Please do help me out
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:55: no viable alternative at input '=''
there should be some error at the query which you are passing. can you post the query?
Works perfect! does it also saves the query results on s3, if yes then how to avoid it?
thank u :)
We can directly query the result from Athena instead of downloading and reading the result file from S3.
athena_result = athena.get_query_results(QueryExecutionId=query_execution_id)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello this code is giving me following error. Please do help me out
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:55: no viable alternative at input '=''