Last active
          June 23, 2021 17:31 
        
      - 
      
- 
        Save skolhustick/e1f2762f10cfeda914a2313b2fe7a8f2 to your computer and use it in GitHub Desktop. 
    Read from Athena using Node.js
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | /* Based on https://docs.aws.amazon.com/code-samples/latest/catalog/javascript-athena-index.js.html */ | |
| const _ = require('lodash') | |
| const Queue = require('async/queue') | |
| const AWS = require('aws-sdk') | |
| require('dotenv').config() | |
| AWS.config.update({ | |
| region: process.env.AWS_REGION, | |
| accessKeyId: process.env.AWS_ACCESS_KEY_ID, | |
| secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY | |
| }) | |
| const client = new AWS.Athena() | |
| const ATHENA_DB = 'voting-app-db' | |
| const ATHENA_OUTPUT_LOCATION = 's3://voting-app-result' | |
| const RESULT_SIZE = 1000 | |
| const POLL_INTERVAL = 1000 | |
| const queryData = async () => { | |
| /* Our SQL QUERY goes her */ | |
| const SQL_QUERY = `SELECT choice, count(choice) AS count FROM "voting-app-db"."voting-app-table" GROUP BY choice` | |
| const data = await makeQuery(SQL_QUERY) | |
| console.log(data) | |
| } | |
| /* Executing the data fetch */ | |
| queryData() | |
| /////////////////////// | |
| // HELPER FUNCTIONS | |
| // FOR DATA WITH MULTIPLE ROWS - YOU WILL RECEIVE AN ARRAY | |
| /////////////////////// | |
| /* Create an async queue to handle polling for query results */ | |
| let q = Queue((id, cb) => { | |
| startPolling(id) | |
| .then(data => { | |
| return cb(null, data) | |
| }) | |
| .catch(err => { | |
| console.log('Failed to poll query: ', err) | |
| return cb(err) | |
| }) | |
| }, 5) | |
| function makeQuery (sql) { | |
| return new Promise((resolve, reject) => { | |
| let params = { | |
| QueryString: sql, | |
| ResultConfiguration: { OutputLocation: ATHENA_OUTPUT_LOCATION }, | |
| QueryExecutionContext: { Database: ATHENA_DB } | |
| } | |
| /* Make API call to start the query execution */ | |
| client.startQueryExecution(params, (err, results) => { | |
| if (err) return reject(err) | |
| /* If successful, get the query ID and queue it for polling */ | |
| q.push(results.QueryExecutionId, (err, qid) => { | |
| if (err) return reject(err) | |
| /* Once query completed executing, get and process results */ | |
| return buildResults(qid) | |
| .then(data => { | |
| return resolve(data) | |
| }) | |
| .catch(err => { | |
| return reject(err) | |
| }) | |
| }) | |
| }) | |
| }) | |
| } | |
| function buildResults (query_id, max, page) { | |
| let max_num_results = max ? max : RESULT_SIZE | |
| let page_token = page ? page : undefined | |
| return new Promise((resolve, reject) => { | |
| let params = { | |
| QueryExecutionId: query_id, | |
| MaxResults: max_num_results, | |
| NextToken: page_token | |
| } | |
| let dataBlob = [] | |
| go(params) | |
| /* Get results and iterate through all pages */ | |
| function go (param) { | |
| getResults(param) | |
| .then(res => { | |
| dataBlob = _.concat(dataBlob, res.list) | |
| if (res.next) { | |
| param.NextToken = res.next | |
| return go(param) | |
| } else { | |
| /* Pagination ends here */ | |
| return resolve(dataBlob) | |
| } | |
| }) | |
| .catch(err => { | |
| return reject(err) | |
| }) | |
| } | |
| /* Process results merging column names and values into a JS object */ | |
| function getResults () { | |
| return new Promise((resolve, reject) => { | |
| client.getQueryResults(params, (err, data) => { | |
| if (err) return reject(err) | |
| var list = [] | |
| let header = buildHeader(data.ResultSet.ResultSetMetadata.ColumnInfo) | |
| let top_row = _.map(_.head(data.ResultSet.Rows).Data, n => { | |
| return n.VarCharValue | |
| }) | |
| let resultSet = | |
| _.difference(header, top_row).length > 0 | |
| ? data.ResultSet.Rows | |
| : _.drop(data.ResultSet.Rows) | |
| resultSet.forEach(item => { | |
| list.push( | |
| _.zipObject( | |
| header, | |
| _.map(item.Data, n => { | |
| return n.VarCharValue | |
| }) | |
| ) | |
| ) | |
| }) | |
| return resolve({ | |
| next: 'NextToken' in data ? data.NextToken : undefined, | |
| list: list | |
| }) | |
| }) | |
| }) | |
| } | |
| }) | |
| } | |
| function startPolling (id) { | |
| return new Promise((resolve, reject) => { | |
| function poll (id) { | |
| client.getQueryExecution({ QueryExecutionId: id }, (err, data) => { | |
| if (err) return reject(err) | |
| if (data.QueryExecution.Status.State === 'SUCCEEDED') return resolve(id) | |
| else if ( | |
| ['FAILED', 'CANCELLED'].includes(data.QueryExecution.Status.State) | |
| ) | |
| return reject(new Error(`Query ${data.QueryExecution.Status.State}`)) | |
| else { | |
| setTimeout(poll, POLL_INTERVAL, id) | |
| } | |
| }) | |
| } | |
| poll(id) | |
| }) | |
| } | |
| function buildHeader (columns) { | |
| return _.map(columns, i => { | |
| return i.Name | |
| }) | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment