-
-
Save schledererj/b2e2a800998d61af2bbdd1cd50e08b76 to your computer and use it in GitHub Desktop.
| # Does NOT implement the PEP 249 spec, but the return type is suggested by the .fetchall function as specified here: https://www.python.org/dev/peps/pep-0249/#fetchall | |
| import time | |
| import boto3 | |
| # query_string: a SQL-like query that Athena will execute | |
| # client: an Athena client created with boto3 | |
| def fetchall_athena(query_string, client): | |
| query_id = client.start_query_execution( | |
| QueryString=query_string, | |
| QueryExecutionContext={ | |
| 'Database': 'DATABASE_NAME' | |
| }, | |
| ResultConfiguration={ | |
| 'OutputLocation': 's3://S3_DROP_LOCATION' | |
| } | |
| )['QueryExecutionId'] | |
| query_status = None | |
| while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None: | |
| query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
| if query_status == 'FAILED' or query_status == 'CANCELLED': | |
| raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string)) | |
| time.sleep(10) | |
| results_paginator = client.get_paginator('get_query_results') | |
| results_iter = results_paginator.paginate( | |
| QueryExecutionId=query_id, | |
| PaginationConfig={ | |
| 'PageSize': 1000 | |
| } | |
| ) | |
| results = [] | |
| data_list = [] | |
| for results_page in results_iter: | |
| for row in results_page['ResultSet']['Rows']: | |
| data_list.append(row['Data']) | |
| for datum in data_list[1:]: | |
| results.append([x['VarCharValue'] for x in datum]) | |
| return [tuple(x) for x in results] |
awesome job!!
Great Work!! Thank you!!
I am pretty new to athena , I do have a use case to query the tables from Athena and display.I am using jupyter notebook to run this code.
How do I call this function.Can some one share the code snippet for this .I just have a simple query like "select count(*) from database1.table1".And I have to display the results as well.
This was super useful, thank you. I'd like to suggest some minor tweaks:
- Return results as a dict, with the column names as keys
- Support null columns (as mentioned above)
results = []
column_names = None
for results_page in results_iter:
for row in results_page['ResultSet']['Rows']:
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return results
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:It believe it should be:
for datum in data_list[0:]:
Actually data_list[1:] is correct. It makes sure to skip the column name of the result
My query was not returning headers, so
for datum in data_list[0:]:
was the correct choice for me.
What's up with the 10 second sleep?
What's up with the 10 second sleep?
@backmuda feel free to reduce it to 0.5 a second.
Works great!
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:It believe it should be:
for datum in data_list[0:]: