Created
May 20, 2017 06:04
-
-
Save Sandyman/6bd342264b79838ae38958a7e08283b7 to your computer and use it in GitHub Desktop.
Index Elasticsearch documents streaming from DynamoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const Elasticsearch = require('aws-es'); | |
const fetch = require('node-fetch'); | |
const ES_SERVICE_ENDPOINT = process.env.ES_SERVICE_ENDPOINT; | |
const ES_INDEX_NAME = process.env.ES_DOMAIN_NAME; | |
const accessKey = process.env.ACCESS_KEY; | |
const secretKey = process.env.SECRET_ACCESS_KEY; | |
const es = new Elasticsearch({ | |
accessKeyId: accessKey, | |
secretAccessKey: secretKey, | |
service: 'es', | |
region: 'us-east-1', | |
host: ES_SERVICE_ENDPOINT | |
}); | |
/** | |
* Get cafe related information from the image object (DynamoDB JSON syntax) | |
* @param data | |
* @param image | |
*/ | |
const parseCafe = (data, image) => { | |
const blends = []; | |
const beans = []; | |
if (image.hasOwnProperty('beans')) { | |
image.beans.L.map(function(bean) { | |
const beanObj = bean.M; | |
beans.push(beanObj.name.S); | |
if (beanObj.hasOwnProperty('blends')) { | |
beanObj.blends.L.map(function(blend) { | |
blends.push(blend.M.name.S); | |
}); | |
} | |
}); | |
} | |
// Convert all fields from DynamoDB json syntax | |
data.name = image.name.S; | |
data.beans = beans; | |
data.blends = blends; | |
/* other fields go here, but you get the gist */ | |
}; | |
/** | |
* Get coffee related information from the image object (DynamoDB JSON syntax) | |
* @param data | |
* @param image | |
*/ | |
const parseCoffee = (data, image) => { | |
const blends = []; | |
if (image.hasOwnProperty('blends')) { | |
image.blends.L.map(function(blend) { | |
blends.push(blend.M.name.S); | |
}); | |
} | |
// Convert all fields from DynamoDB json syntax | |
data.name = image.name.S; | |
data.blends = blends; | |
}; | |
const parsers = { | |
'CafesTable-dev': parseCafe, | |
'CoffeesTable-dev': parseCoffee | |
}; | |
/** | |
* Update a single search document | |
* @param record | |
* @returns {Promise} | |
*/ | |
const updateDocument = (record) => new Promise((resolve, reject) => { | |
// Get table name from the event source ARN, which determines the type | |
const tableName = /^[^\/]+\/([^\/]+)\/.*$/.exec(record.eventSourceARN)[1]; | |
const type = tableName === 'CafesTable-dev' ? 'cc' : 'bb'; | |
const id = record.dynamodb.Keys.id.S; | |
const options = { | |
index: ES_INDEX_NAME, | |
type: type, | |
id: id | |
}; | |
if (record.eventName === 'REMOVE') { | |
// Document to be removed from index | |
es.delete(options, (err, data) => { | |
if (err) return reject(err); | |
else return resolve(data); | |
}); | |
} else { | |
// New or updated document to be added/updated | |
const data = {id : id}; | |
// Parse the record image based on the table name and update data | |
parsers[tableName](data, record.dynamodb.NewImage); | |
// data will have been updated by the parser | |
options.body = data; | |
es.index(options, (err, data) => { | |
if (err) return reject(err); | |
else return resolve(data); | |
}) | |
} | |
}); | |
/** | |
* Main service handler - triggered from a DynamoDB stream event | |
* @param event | |
* @param context | |
* @param callback | |
*/ | |
module.exports.update = (event, context, callback) => { | |
console.log(JSON.stringify(event, null, 3)); | |
// updateDocument() returns a promise | |
const promises = event.Records.map(record => updateDocument(record)); | |
Promise.all(promises) | |
.then(r => { | |
// We log the result of every document | |
r.forEach(x => console.log(JSON.stringify(x, null, 3))); | |
return callback(null, 'All good'); | |
}) | |
.catch(err => { | |
// Log error if something goes wrong | |
console.log("Something went wrong: " + err.message); | |
return callback(null, err.message) | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
tip: check
AWS.DynamoDB.Converter.unmarshall
from theaws-sdk
if you don't want to deal with the dynamodb json format. It converts a DynamoDB record into a JavaScript object.