Skip to content

Instantly share code, notes, and snippets.

@skolhustick
Last active June 23, 2021 17:31
Show Gist options
  • Save skolhustick/e1f2762f10cfeda914a2313b2fe7a8f2 to your computer and use it in GitHub Desktop.
Save skolhustick/e1f2762f10cfeda914a2313b2fe7a8f2 to your computer and use it in GitHub Desktop.
Read from Athena using Node.js
/* Based on https://docs.aws.amazon.com/code-samples/latest/catalog/javascript-athena-index.js.html */
const _ = require('lodash')
const Queue = require('async/queue')
const AWS = require('aws-sdk')
require('dotenv').config()
AWS.config.update({
region: process.env.AWS_REGION,
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
})
const client = new AWS.Athena()
const ATHENA_DB = 'voting-app-db'
const ATHENA_OUTPUT_LOCATION = 's3://voting-app-result'
const RESULT_SIZE = 1000
const POLL_INTERVAL = 1000
const queryData = async () => {
/* Our SQL QUERY goes her */
const SQL_QUERY = `SELECT choice, count(choice) AS count FROM "voting-app-db"."voting-app-table" GROUP BY choice`
const data = await makeQuery(SQL_QUERY)
console.log(data)
}
/* Executing the data fetch */
queryData()
///////////////////////
// HELPER FUNCTIONS
// FOR DATA WITH MULTIPLE ROWS - YOU WILL RECEIVE AN ARRAY
///////////////////////
/* Create an async queue to handle polling for query results */
let q = Queue((id, cb) => {
startPolling(id)
.then(data => {
return cb(null, data)
})
.catch(err => {
console.log('Failed to poll query: ', err)
return cb(err)
})
}, 5)
function makeQuery (sql) {
return new Promise((resolve, reject) => {
let params = {
QueryString: sql,
ResultConfiguration: { OutputLocation: ATHENA_OUTPUT_LOCATION },
QueryExecutionContext: { Database: ATHENA_DB }
}
/* Make API call to start the query execution */
client.startQueryExecution(params, (err, results) => {
if (err) return reject(err)
/* If successful, get the query ID and queue it for polling */
q.push(results.QueryExecutionId, (err, qid) => {
if (err) return reject(err)
/* Once query completed executing, get and process results */
return buildResults(qid)
.then(data => {
return resolve(data)
})
.catch(err => {
return reject(err)
})
})
})
})
}
function buildResults (query_id, max, page) {
let max_num_results = max ? max : RESULT_SIZE
let page_token = page ? page : undefined
return new Promise((resolve, reject) => {
let params = {
QueryExecutionId: query_id,
MaxResults: max_num_results,
NextToken: page_token
}
let dataBlob = []
go(params)
/* Get results and iterate through all pages */
function go (param) {
getResults(param)
.then(res => {
dataBlob = _.concat(dataBlob, res.list)
if (res.next) {
param.NextToken = res.next
return go(param)
} else {
/* Pagination ends here */
return resolve(dataBlob)
}
})
.catch(err => {
return reject(err)
})
}
/* Process results merging column names and values into a JS object */
function getResults () {
return new Promise((resolve, reject) => {
client.getQueryResults(params, (err, data) => {
if (err) return reject(err)
var list = []
let header = buildHeader(data.ResultSet.ResultSetMetadata.ColumnInfo)
let top_row = _.map(_.head(data.ResultSet.Rows).Data, n => {
return n.VarCharValue
})
let resultSet =
_.difference(header, top_row).length > 0
? data.ResultSet.Rows
: _.drop(data.ResultSet.Rows)
resultSet.forEach(item => {
list.push(
_.zipObject(
header,
_.map(item.Data, n => {
return n.VarCharValue
})
)
)
})
return resolve({
next: 'NextToken' in data ? data.NextToken : undefined,
list: list
})
})
})
}
})
}
function startPolling (id) {
return new Promise((resolve, reject) => {
function poll (id) {
client.getQueryExecution({ QueryExecutionId: id }, (err, data) => {
if (err) return reject(err)
if (data.QueryExecution.Status.State === 'SUCCEEDED') return resolve(id)
else if (
['FAILED', 'CANCELLED'].includes(data.QueryExecution.Status.State)
)
return reject(new Error(`Query ${data.QueryExecution.Status.State}`))
else {
setTimeout(poll, POLL_INTERVAL, id)
}
})
}
poll(id)
})
}
function buildHeader (columns) {
return _.map(columns, i => {
return i.Name
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment