-
-
Save schledererj/b2e2a800998d61af2bbdd1cd50e08b76 to your computer and use it in GitHub Desktop.
| # Does NOT implement the PEP 249 spec, but the return type is suggested by the .fetchall function as specified here: https://www.python.org/dev/peps/pep-0249/#fetchall | |
| import time | |
| import boto3 | |
| # query_string: a SQL-like query that Athena will execute | |
| # client: an Athena client created with boto3 | |
| def fetchall_athena(query_string, client): | |
| query_id = client.start_query_execution( | |
| QueryString=query_string, | |
| QueryExecutionContext={ | |
| 'Database': 'DATABASE_NAME' | |
| }, | |
| ResultConfiguration={ | |
| 'OutputLocation': 's3://S3_DROP_LOCATION' | |
| } | |
| )['QueryExecutionId'] | |
| query_status = None | |
| while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None: | |
| query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State'] | |
| if query_status == 'FAILED' or query_status == 'CANCELLED': | |
| raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string)) | |
| time.sleep(10) | |
| results_paginator = client.get_paginator('get_query_results') | |
| results_iter = results_paginator.paginate( | |
| QueryExecutionId=query_id, | |
| PaginationConfig={ | |
| 'PageSize': 1000 | |
| } | |
| ) | |
| results = [] | |
| data_list = [] | |
| for results_page in results_iter: | |
| for row in results_page['ResultSet']['Rows']: | |
| data_list.append(row['Data']) | |
| for datum in data_list[1:]: | |
| results.append([x['VarCharValue'] for x in datum]) | |
| return [tuple(x) for x in results] |
Hi,
thanks for the method, really helpful indeed.
Although, if your row contains NaN values the execution may fail, so here's a little fix:
results.append([x['VarCharValue'] if 'VarCharValue' in x else '' for x in datum])
I'll second what Tiago said. This method was a gem to find -- thank you!
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:
It believe it should be:
for datum in data_list[0:]:
awesome job!!
Great Work!! Thank you!!
I am pretty new to athena , I do have a use case to query the tables from Athena and display.I am using jupyter notebook to run this code.
How do I call this function.Can some one share the code snippet for this .I just have a simple query like "select count(*) from database1.table1".And I have to display the results as well.
This was super useful, thank you. I'd like to suggest some minor tweaks:
- Return results as a dict, with the column names as keys
- Support null columns (as mentioned above)
results = []
column_names = None
for results_page in results_iter:
for row in results_page['ResultSet']['Rows']:
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return results
There is a bug apparently, the code is skipping the first value:
for datum in data_list[1:]:It believe it should be:
for datum in data_list[0:]:
Actually data_list[1:] is correct. It makes sure to skip the column name of the result
My query was not returning headers, so
for datum in data_list[0:]:
was the correct choice for me.
What's up with the 10 second sleep?
What's up with the 10 second sleep?
@backmuda feel free to reduce it to 0.5 a second.
Works great!
Hello,
Have you ever used a column having structure as ?
[{hop=1, error=null, result=[{x=null, from=192.168.0.1, rtt=0.378, ttl=64}]}]and its structure :
``result
array<struct<hop:int,error:string,result:array<struct<x:string,from:string,rtt:float,ttl:int>>>>with `=`, I can't parse the object as json object.
Thanks in advance.