Skip to content

Instantly share code, notes, and snippets.

@blindman2k
Last active August 29, 2016 01:36
Show Gist options
  • Save blindman2k/048fe1feaefc41d6388f7e22e950b2c6 to your computer and use it in GitHub Desktop.
Save blindman2k/048fe1feaefc41d6388f7e22e950b2c6 to your computer and use it in GitHub Desktop.
AWSRequestV4 and AWSDynamoDB
/**
* This class can be used to interact with a Amazon DynamoDB table
*
* @author Aron Steg <[email protected]>
*
* @version 0.0.1
*/
class AWSDynamoDB {
static version = [0, 0, 2];
static SERVICE = "dynamodb";
static TARGET_PREFIX = "DynamoDB_20120810";
_awsRequest = null;
/**
* @param {string} region
* @param {string} accessKeyId
* @param {string} secretAccessKey
*/
constructor(region, accessKeyId, secretAccessKey) {
if ("AWSRequestV4" in getroottable()) {
_awsRequest = AWSRequestV4(SERVICE, region, accessKeyId, secretAccessKey);
} else {
throw ("This class requires AWSRequestV4 - please make sure it is loaded.");
}
}
/**
* Inserts or replaces an item in the database
*
* @param {string} tableName
* @param {table} item
* @param {table} extras
* @param {function} cb
* @return {null}
*/
function PutItem(tableName, item, extras=null, cb=null) {
local headers = { "X-Amz-Target": format("%s.PutItem", TARGET_PREFIX) };
local body = {
"TableName": tableName,
"Item": _toDynamoDBTypes(item)
};
// Optional extras
if (typeof extras == "function") {
cb = extras;
extras = null;
}
if (typeof extras == "table") {
foreach (k,v in extras) {
body[k] <- v;
}
}
// server.log( http.jsonencode(body));
_awsRequest.post("/", headers, http.jsonencode(body), cb);
}
/**
* Updates some of the fields on an item in the database
*
* @param {string} tableName
* @param {table} item
* @param {table} extras
* @param {function} cb
* @return {null}
*/
function UpdateItem(tableName, key, item, extras=null, cb=null) {
local headers = { "X-Amz-Target": format("%s.UpdateItem", TARGET_PREFIX) };
local body = {
"TableName": tableName,
"Key": _toDynamoDBTypes(key),
"UpdateExpression": "SET ",
"ExpressionAttributeValues": {
}
};
// Load up the attributes into the object.
local id = 0;
foreach (key, val in item) {
// Protect against reserved keywords
if (key == "language") key = "lang";
body.UpdateExpression += format("%s = :val%d,", key, ++id);
body.ExpressionAttributeValues[":val" + id] <- AWSDynamoDB._toDynamoDBTypes(val, false)
}
// Clean up
if (body.UpdateExpression.len() > 4) {
body.UpdateExpression = body.UpdateExpression.slice(0, -1);
} else {
delete body.UpdateExpression;
}
if (body.ExpressionAttributeValues.len() == 0) {
delete body.ExpressionAttributeValues;
}
// Optional extras
if (typeof extras == "function") {
cb = extras;
extras = null;
}
if (typeof extras == "table") {
foreach (k,v in extras) {
body[k] <- v;
}
}
server.log(http.jsonencode(body));
_awsRequest.post("/", headers, http.jsonencode(body), cb);
}
/**
* Writes a batch of up to 25 entries to the database
*
* @param {string} tableName
* @param {array} puts
* @param {array} deletes
* @param {table} extras
* @param {function} cb
* @return {null}
*/
function BatchWriteItem(tableName, puts, deletes, extras = null, cb = null) {
const MAX_ITEMS_PER_BATCHWRITE = 25;
const MAX_RETRIES_PER_BATCHWRITE = 128;
// Optional extras
if (typeof extras == "function") {
cb = extras;
extras = null;
}
if (puts == null) puts = [];
if (deletes == null) deletes = [];
if (extras == null) extras = {};
if (typeof puts != "array") throw "Puts must be an array";
if (typeof deletes != "array") throw "Deletes must be an array";
if (typeof extras != "table") throw "Extras must be a table";
if (puts.len() + deletes.len() > MAX_ITEMS_PER_BATCHWRITE) throw "A maximum of " + MAX_ITEMS_PER_BATCHWRITE + " items can be batched";
local body = { "RequestItems": {} };
body.RequestItems[tableName] <- [];
// Put requests
foreach (item in puts) {
body.RequestItems[tableName].push( { "PutRequest": { "Item": _toDynamoDBTypes(item) } } );
}
// Delete requests
foreach (item in deletes) {
body.RequestItems[tableName].push( { "DeleteRequest": { "Key": _toDynamoDBTypes(item) } } );
}
// Optional extras
if (typeof extras == "table") {
foreach (k,v in extras) {
body[k] <- v;
}
}
local retry;
retry = function(body, cb, retries = 0) {
local headers = { "X-Amz-Target": format("%s.BatchWriteItem", TARGET_PREFIX) };
_awsRequest.post("/", headers, http.jsonencode(body), function(res) {
if (res.statuscode == 200) {
try {
local response = http.jsondecode(res.body);
if ("UnprocessedItems" in response) {
if (response.UnprocessedItems.len() > 0) {
// Retry the unprocessed items
local delay = math.pow(2, retries) * 0.5;
return imp.wakeup(delay, function() {
retry({ "RequestItems" : response.UnprocessedItems }, cb, retries+1);
}.bindenv(this));
}
}
} catch (e) {
server.error(e);
}
}
return cb(res);
}.bindenv(this));
}
// server.log( http.jsonencode(body));
retry(body, cb);
}
function _toDynamoDBTypes(item, init = true) {
if (init && typeof item != "table") {
throw "Must provide a table";
}
switch (typeof item) {
case "array":
if (init) {
local res = [];
foreach (v in item) {
res.push(_toDynamoDBTypes(v, false));
}
return res;
} else {
local res = { "L" : [] };
foreach (v in item) {
res["L"].push(_toDynamoDBTypes(v, false));
}
return res;
}
case "table":
if (init) {
local res = {};
foreach (k,v in item) {
res[k] <- _toDynamoDBTypes(v, false);
}
return res;
} else {
local res = { "M" : {} };
foreach (k,v in item) {
res["M"][k] <- _toDynamoDBTypes(v, false);
}
return res;
}
case "bool":
return { "BOOL": item.tostring() }
case "blob":
return { "B": http.base64encode(item) };
case "float":
case "integer":
return { "N": item.tostring() };
case "string":
return { "S": item };
case "null":
return { "NULL": true };
}
}
}
/**
* This class can be used to generate correctly structured requests intended for AWS endpoints,
* sign the requests using Amazon's "Signature Version 4", and send them. It's intended to be used
* internally by wrapper classes for specific AWS services.
*
* @author Gino Miglio <[email protected]>
* @author Mikhail Yurasov <[email protected]>
*
* @version 1.0.2
*/
class AWSRequestV4 {
static version = [1, 0, 2];
static ALGORITHM = "AWS4-HMAC-SHA256";
_service = null;
_region = null;
_accessKeyId = null;
_secretAccessKey = null;
_date = null;
_dateTime = null;
_signedHeaders = null;
_serviceUrl = null;
_serviceHost = null;
/**
* @param {string} service
* @param {string} region
* @param {string} accessKeyId
* @param {string} secretAccessKey
*/
constructor(service, region, accessKeyId, secretAccessKey) {
_service = service;
_region = region;
_accessKeyId = accessKeyId;
_secretAccessKey = secretAccessKey;
_serviceUrl = format("https://%s.%s.amazonaws.com", service, region);
_serviceHost = format("%s.%s.amazonaws.com", service, region);
}
/**
* Make request
*
* @param {string} method
* @param {string} path
* @param {string} queryString
* @param {table} headers
* @param {string} body
* @param {function} callback
*
* @return {null}
*/
function request(method, path, queryString, headers, body, callback) {
// TODO: parse queryString properly from the URL
_updateTimestamps();
// These headers are used in the request signature
headers["Host"] <- _serviceHost;
if (!("Content-Type" in headers)) headers["Content-Type"] <- "application/x-amz-json-1.0";
// Add the signature to the headers
local signature = _getSignature(method, path, queryString, headers, body);
headers["Authorization"] <- format("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s",
ALGORITHM, _accessKeyId, _getCredentialScope(), _signedHeaders, signature);
// This header is added *after* the request is signed
headers["X-Amz-Date"] <- _dateTime;
local url = _serviceUrl + path;
local retry;
retry = function(method, url, headers, body, callback, retries = 0) {
http.request(method, url, headers, body).sendasync(function(res) {
if (res.statuscode == 400) {
try {
local response = http.jsondecode(res.body);
if ("__type" in response) {
if (response.__type.find("#ProvisionedThroughputExceededException") != null) {
// Pushback from AWS, retry the whole thing in a bit
local delay = math.pow(2, retries) * 0.05;
// server.log("Pushback from AWS, retry after " + delay)
return imp.wakeup(delay, function() {
retry(method, url, headers, body, callback, retries+1);
}.bindenv(this));
}
}
} catch (e) {
server.error(e);
}
} else if (res.statuscode == 429) {
// Pushback from Imp, retry the whole thing in a bit
local delay = 1.0;
// server.log("Pushback from Imp, retry after " + delay)
return imp.wakeup(delay, function() {
retry(method, url, headers, body, callback, retries);
}.bindenv(this));
}
return callback(res);
}.bindenv(this));
}
retry(method, url, headers, body, callback);
}
/**
* Shorthand for request('POST',...)
*
* @param {string} queryString
* @param {table} headers
* @param {string} body
* @param {function} callback
* @return {null}
*/
function post(path, headers, body, callback) {
return request("POST", path, "", headers, body, callback);
}
/**
* Join array items into a string, separated by a delimiter.
* (i.e. the last part will not have a trailing delimiter)
*
* @param {array} parts
* @param {string} delimiter
* @return {string}
* @private
*/
function _strJoin(parts, delimiter) {
local result = "";
for (local i = 0; i < parts.len() - 1; i++) {
result += parts[i] + delimiter;
}
result += parts[parts.len() - 1];
return result;
}
/**
* @param {blob} data
* @return {string}
* @private
*/
function _blobToHexString(data) {
local str = "";
foreach(byte in data) {
str += format("%02x", byte);
}
return str;
}
/**
* @return {null}
* @private
*/
function _updateTimestamps() {
local date = date();
_dateTime = format("%04d%02d%02dT%02d%02d%02dZ",
date.year, date.month + 1, date.day,
date.hour, date.min, date.sec);
_date = _dateTime.slice(0, 8);
}
/**
* @return {string}
* @private
*/
function _getHashedCanonicalRequest(method, path, queryString, headerTable, payload) {
// Format headers according to AWS spec (lowercase, whitespace trimmed, alphabetical order, etc)
// TODO: extra spaces between non-quoted header values should be removed as well
local headerArray = [];
local signedHeaderArray = [];
foreach(key, val in headerTable) {
headerArray.push(key.tolower() + ":" + strip(val) + "\n");
signedHeaderArray.push(key.tolower());
}
headerArray.sort();
signedHeaderArray.sort();
local headers = _strJoin(headerArray, "");
_signedHeaders = _strJoin(signedHeaderArray, ";");
// Hash the payload and convert to a lowercase hex string
local payloadHash = _blobToHexString(http.hash.sha256(payload));
// Create the canonical request and return a hex-encoded hash of it
local canonicalRequest = _strJoin([method, path, queryString, headers, _signedHeaders, payloadHash], "\n");
return _blobToHexString(http.hash.sha256(canonicalRequest));
}
/**
* @return {string}
* @private
*/
function _getCredentialScope() {
return _date + format("/%s/%s/aws4_request", _region, _service);
}
/**
* @return {string}
* @private
*/
function _deriveSigningKey() {
local kDate = http.hash.hmacsha256(_date, "AWS4" + _secretAccessKey);
local kRegion = http.hash.hmacsha256(_region, kDate);
local kService = http.hash.hmacsha256(_service, kRegion);
local kSigning = http.hash.hmacsha256("aws4_request", kService);
return kSigning;
}
/**
* Caninicalizes the request and creates the signature
*
* @param {string} method
* @param {string} path
* @param {string} queryString
* @param {array} headers
* @param {string} body
* @return {string}
* @private
*/
function _getSignature(method, path, queryString, headers, body) {
// Get the bits and bobs we need to sign a request
local hashedCanonicalRequest = _getHashedCanonicalRequest(method, path, queryString, headers, body);
local stringToSign = _strJoin([ALGORITHM, _dateTime, _getCredentialScope(), hashedCanonicalRequest], "\n");
local signingKey = _deriveSigningKey();
// Return the signature
return _blobToHexString(http.hash.hmacsha256(stringToSign, signingKey));
}
}
function mstime() {
local d = date();
return format("%d.%06d", d.time, d.usec);
}
function test_BatchWriteItem() {
local events = [];
for (local j = 0; j < 25; j++) {
local event = {};
event.agentid <- split(http.agenturl(), "/").top();
event.deviceid <- imp.configparams.deviceid;
event.timestamp <- mstime();
events.push( event );
}
i++;
server.log("BatchWriteItem: #" + i)
dynamodb.BatchWriteItem("testEvents", events, null, function(res) {
imp.wakeup(1, test_BatchWriteItem);
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body));
}.bindenv(this))
}
function test_PutItem() {
local obj = {};
obj.agentid <- split(http.agenturl(), "/").top();
obj.i <- { "i": i++ };
server.log("PutItem: #" + i)
dynamodb.PutItem("testFridges", obj, function(res) {
imp.wakeup(0.25, test_PutItem);
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body));
}.bindenv(this))
}
function test_UpdateItem() {
local obj = {};
obj.agentid <- split(http.agenturl(), "/").top();
obj.deviceid <- imp.configparams.deviceid;
obj.updated <- time();
obj.empty <- [ ];
obj.teams <- [ "NHLAWS", "NBAABC", 1 ];
obj.temps <- [ 1.2, 2.3, -3.4 ];
obj.signedup <- true;
obj.tests <- [ true, false, true ];
obj.maps <- { "a": 1, "b": 2 };
obj.listsoflists <- [ [ [ [ [ 1, 2 ], 3], 4], 5], 6];
obj.nulli <- null;
obj.i <- i++;
server.log("PutItem: #" + i)
dynamodb.PutItem("testFridges", obj, function(res) {
server.log("UpdateItem: #" + i)
dynamodb.UpdateItem("testFridges", { "agentid": obj.agentid }, { "signedup" : false }, function(res) {
imp.wakeup(1, test_UpdateItem);
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body));
}.bindenv(this))
}.bindenv(this))
}
dynamodb <- AWSDynamoDB("us-west-2", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY);
i <- 1;
test_PutItem();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment