-
-
Save schledererj/b2e2a800998d61af2bbdd1cd50e08b76 to your computer and use it in GitHub Desktop.
# Does NOT implement the PEP 249 spec, but the return type is suggested by the .fetchall function as specified here: https://www.python.org/dev/peps/pep-0249/#fetchall | |
import time | |
import boto3 | |
# query_string: a SQL-like query that Athena will execute | |
# client: an Athena client created with boto3 | |
def fetchall_athena(query_string, client): | |
query_id = client.start_query_execution( | |
QueryString=query_string, | |
QueryExecutionContext={ | |
'Database': 'DATABASE_NAME' | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://S3_DROP_LOCATION' | |
} | |
)['QueryExecutionId'] | |
query_status = None | |
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None: | |
query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
if query_status == 'FAILED' or query_status == 'CANCELLED': | |
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string)) | |
time.sleep(10) | |
results_paginator = client.get_paginator('get_query_results') | |
results_iter = results_paginator.paginate( | |
QueryExecutionId=query_id, | |
PaginationConfig={ | |
'PageSize': 1000 | |
} | |
) | |
results = [] | |
data_list = [] | |
for results_page in results_iter: | |
for row in results_page['ResultSet']['Rows']: | |
data_list.append(row['Data']) | |
for datum in data_list[1:]: | |
results.append([x['VarCharValue'] for x in datum]) | |
return [tuple(x) for x in results] |
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:
It believe it should be:
for datum in data_list[0:]:
awesome job!!
Great Work!! Thank you!!
I am pretty new to athena , I do have a use case to query the tables from Athena and display.I am using jupyter notebook to run this code.
How do I call this function.Can some one share the code snippet for this .I just have a simple query like "select count(*) from database1.table1".And I have to display the results as well.
This was super useful, thank you. I'd like to suggest some minor tweaks:
- Return results as a dict, with the column names as keys
- Support null columns (as mentioned above)
results = []
column_names = None
for results_page in results_iter:
for row in results_page['ResultSet']['Rows']:
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return results
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:
It believe it should be:
for datum in data_list[0:]:
Actually data_list[1:] is correct. It makes sure to skip the column name of the result
My query was not returning headers, so
for datum in data_list[0:]:
was the correct choice for me.
What's up with the 10 second sleep?
What's up with the 10 second sleep?
@backmuda feel free to reduce it to 0.5 a second.
Works great!
I'll second what Tiago said. This method was a gem to find -- thank you!