Skip to content

Instantly share code, notes, and snippets.

@johndstein
Created October 7, 2019 15:13
Show Gist options
  • Save johndstein/c8f077dea161491800a7ea83a0cb13c0 to your computer and use it in GitHub Desktop.
Save johndstein/c8f077dea161491800a7ea83a0cb13c0 to your computer and use it in GitHub Desktop.
S3 JSON line reader (fast / parallel)
#!/usr/bin/env node
'use strict';
const AWS = require('aws-sdk');
if (process.env.AWS_PROFILE) { // eslint-disable-line
AWS.config.credentials = new AWS.SharedIniFileCredentials({
profile: process.env.AWS_PROFILE // eslint-disable-line
});
AWS.config.logger = console;
}
const s3opts = { apiVersion: '2006-03-01' };
if (process.env.AWS_S3_ENDPOINT) { // eslint-disable-line
s3opts.endpoint = process.env.AWS_S3_ENDPOINT; // eslint-disable-line
s3opts.s3ForcePathStyle = true;
s3opts.signatureVersion = 'v4';
}
const s3 = new AWS.S3(s3opts);
const gunzipA = require('util').promisify(require('zlib').gunzip);
class S3JsonlReader extends require('stream').Readable {
constructor(options) {
options.objectMode = true;
super(options);
this.keys = options.keys;
if (!Array.isArray(this.keys)) {
throw new Error('keys array required.');
}
this.keys.forEach((k) => {
if (!(typeof k === 'string')) {
throw new Error('keys array must contain only strings.');
}
});
this.bucket = options.bucket;
this.promises = null;
this.errors = [];
this.events = [];
}
_read() {
if (!this.promises) {
this.promises = this.keys.map((k) => {
const bucketKey = this.parseKey(k, this.bucket);
return {
bucketKey,
promise: s3.getObject(bucketKey).promise()
.catch((error) => {
error.bucketKey = bucketKey;
this.errors.push(error);
})
};
});
}
if (this.events.length) {
this.push(this.events.splice(0, 1)[0]);
}
else {
this.getEventsFromPromises()
.then(() => {
if (this.events.length) {
this.push(this.events.splice(0, 1)[0]);
}
else {
this.push(null);
}
});
}
}
parseKey(Key, Bucket) {
if (Key.startsWith('s3://')) {
const a = Key.split('/');
Bucket = a[2];
Key = a.slice(3).join('/');
}
return { Bucket, Key };
}
async getEventsFromPromises() {
if (this.promises.length) {
const { promise, bucketKey } = this.promises.splice(0, 1)[0];
try {
let object = await promise;
object = await gunzipA(object.Body);
this.events = object
.toString()
.split('\n')
.filter((l) => l.trim().length > 6)
.map((l) => JSON.parse(l));
}
catch (error) {
this.errors.push({
message: error.message,
bucketKey,
stack: error.stack
});
await this.getEventsFromPromises();
}
}
}
}
exports = module.exports = S3JsonlReader;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment