Last active
August 29, 2016 01:36
-
-
Save blindman2k/048fe1feaefc41d6388f7e22e950b2c6 to your computer and use it in GitHub Desktop.
AWSRequestV4 and AWSDynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This class can be used to interact with a Amazon DynamoDB table | |
* | |
* @author Aron Steg <[email protected]> | |
* | |
* @version 0.0.1 | |
*/ | |
class AWSDynamoDB { | |
static version = [0, 0, 2]; | |
static SERVICE = "dynamodb"; | |
static TARGET_PREFIX = "DynamoDB_20120810"; | |
_awsRequest = null; | |
/** | |
* @param {string} region | |
* @param {string} accessKeyId | |
* @param {string} secretAccessKey | |
*/ | |
constructor(region, accessKeyId, secretAccessKey) { | |
if ("AWSRequestV4" in getroottable()) { | |
_awsRequest = AWSRequestV4(SERVICE, region, accessKeyId, secretAccessKey); | |
} else { | |
throw ("This class requires AWSRequestV4 - please make sure it is loaded."); | |
} | |
} | |
/** | |
* Inserts or replaces an item in the database | |
* | |
* @param {string} tableName | |
* @param {table} item | |
* @param {table} extras | |
* @param {function} cb | |
* @return {null} | |
*/ | |
function PutItem(tableName, item, extras=null, cb=null) { | |
local headers = { "X-Amz-Target": format("%s.PutItem", TARGET_PREFIX) }; | |
local body = { | |
"TableName": tableName, | |
"Item": _toDynamoDBTypes(item) | |
}; | |
// Optional extras | |
if (typeof extras == "function") { | |
cb = extras; | |
extras = null; | |
} | |
if (typeof extras == "table") { | |
foreach (k,v in extras) { | |
body[k] <- v; | |
} | |
} | |
// server.log( http.jsonencode(body)); | |
_awsRequest.post("/", headers, http.jsonencode(body), cb); | |
} | |
/** | |
* Updates some of the fields on an item in the database | |
* | |
* @param {string} tableName | |
* @param {table} item | |
* @param {table} extras | |
* @param {function} cb | |
* @return {null} | |
*/ | |
function UpdateItem(tableName, key, item, extras=null, cb=null) { | |
local headers = { "X-Amz-Target": format("%s.UpdateItem", TARGET_PREFIX) }; | |
local body = { | |
"TableName": tableName, | |
"Key": _toDynamoDBTypes(key), | |
"UpdateExpression": "SET ", | |
"ExpressionAttributeValues": { | |
} | |
}; | |
// Load up the attributes into the object. | |
local id = 0; | |
foreach (key, val in item) { | |
// Protect against reserved keywords | |
if (key == "language") key = "lang"; | |
body.UpdateExpression += format("%s = :val%d,", key, ++id); | |
body.ExpressionAttributeValues[":val" + id] <- AWSDynamoDB._toDynamoDBTypes(val, false) | |
} | |
// Clean up | |
if (body.UpdateExpression.len() > 4) { | |
body.UpdateExpression = body.UpdateExpression.slice(0, -1); | |
} else { | |
delete body.UpdateExpression; | |
} | |
if (body.ExpressionAttributeValues.len() == 0) { | |
delete body.ExpressionAttributeValues; | |
} | |
// Optional extras | |
if (typeof extras == "function") { | |
cb = extras; | |
extras = null; | |
} | |
if (typeof extras == "table") { | |
foreach (k,v in extras) { | |
body[k] <- v; | |
} | |
} | |
server.log(http.jsonencode(body)); | |
_awsRequest.post("/", headers, http.jsonencode(body), cb); | |
} | |
/** | |
* Writes a batch of up to 25 entries to the database | |
* | |
* @param {string} tableName | |
* @param {array} puts | |
* @param {array} deletes | |
* @param {table} extras | |
* @param {function} cb | |
* @return {null} | |
*/ | |
function BatchWriteItem(tableName, puts, deletes, extras = null, cb = null) { | |
const MAX_ITEMS_PER_BATCHWRITE = 25; | |
const MAX_RETRIES_PER_BATCHWRITE = 128; | |
// Optional extras | |
if (typeof extras == "function") { | |
cb = extras; | |
extras = null; | |
} | |
if (puts == null) puts = []; | |
if (deletes == null) deletes = []; | |
if (extras == null) extras = {}; | |
if (typeof puts != "array") throw "Puts must be an array"; | |
if (typeof deletes != "array") throw "Deletes must be an array"; | |
if (typeof extras != "table") throw "Extras must be a table"; | |
if (puts.len() + deletes.len() > MAX_ITEMS_PER_BATCHWRITE) throw "A maximum of " + MAX_ITEMS_PER_BATCHWRITE + " items can be batched"; | |
local body = { "RequestItems": {} }; | |
body.RequestItems[tableName] <- []; | |
// Put requests | |
foreach (item in puts) { | |
body.RequestItems[tableName].push( { "PutRequest": { "Item": _toDynamoDBTypes(item) } } ); | |
} | |
// Delete requests | |
foreach (item in deletes) { | |
body.RequestItems[tableName].push( { "DeleteRequest": { "Key": _toDynamoDBTypes(item) } } ); | |
} | |
// Optional extras | |
if (typeof extras == "table") { | |
foreach (k,v in extras) { | |
body[k] <- v; | |
} | |
} | |
local retry; | |
retry = function(body, cb, retries = 0) { | |
local headers = { "X-Amz-Target": format("%s.BatchWriteItem", TARGET_PREFIX) }; | |
_awsRequest.post("/", headers, http.jsonencode(body), function(res) { | |
if (res.statuscode == 200) { | |
try { | |
local response = http.jsondecode(res.body); | |
if ("UnprocessedItems" in response) { | |
if (response.UnprocessedItems.len() > 0) { | |
// Retry the unprocessed items | |
local delay = math.pow(2, retries) * 0.5; | |
return imp.wakeup(delay, function() { | |
retry({ "RequestItems" : response.UnprocessedItems }, cb, retries+1); | |
}.bindenv(this)); | |
} | |
} | |
} catch (e) { | |
server.error(e); | |
} | |
} | |
return cb(res); | |
}.bindenv(this)); | |
} | |
// server.log( http.jsonencode(body)); | |
retry(body, cb); | |
} | |
function _toDynamoDBTypes(item, init = true) { | |
if (init && typeof item != "table") { | |
throw "Must provide a table"; | |
} | |
switch (typeof item) { | |
case "array": | |
if (init) { | |
local res = []; | |
foreach (v in item) { | |
res.push(_toDynamoDBTypes(v, false)); | |
} | |
return res; | |
} else { | |
local res = { "L" : [] }; | |
foreach (v in item) { | |
res["L"].push(_toDynamoDBTypes(v, false)); | |
} | |
return res; | |
} | |
case "table": | |
if (init) { | |
local res = {}; | |
foreach (k,v in item) { | |
res[k] <- _toDynamoDBTypes(v, false); | |
} | |
return res; | |
} else { | |
local res = { "M" : {} }; | |
foreach (k,v in item) { | |
res["M"][k] <- _toDynamoDBTypes(v, false); | |
} | |
return res; | |
} | |
case "bool": | |
return { "BOOL": item.tostring() } | |
case "blob": | |
return { "B": http.base64encode(item) }; | |
case "float": | |
case "integer": | |
return { "N": item.tostring() }; | |
case "string": | |
return { "S": item }; | |
case "null": | |
return { "NULL": true }; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This class can be used to generate correctly structured requests intended for AWS endpoints, | |
* sign the requests using Amazon's "Signature Version 4", and send them. It's intended to be used | |
* internally by wrapper classes for specific AWS services. | |
* | |
* @author Gino Miglio <[email protected]> | |
* @author Mikhail Yurasov <[email protected]> | |
* | |
* @version 1.0.2 | |
*/ | |
class AWSRequestV4 { | |
static version = [1, 0, 2]; | |
static ALGORITHM = "AWS4-HMAC-SHA256"; | |
_service = null; | |
_region = null; | |
_accessKeyId = null; | |
_secretAccessKey = null; | |
_date = null; | |
_dateTime = null; | |
_signedHeaders = null; | |
_serviceUrl = null; | |
_serviceHost = null; | |
/** | |
* @param {string} service | |
* @param {string} region | |
* @param {string} accessKeyId | |
* @param {string} secretAccessKey | |
*/ | |
constructor(service, region, accessKeyId, secretAccessKey) { | |
_service = service; | |
_region = region; | |
_accessKeyId = accessKeyId; | |
_secretAccessKey = secretAccessKey; | |
_serviceUrl = format("https://%s.%s.amazonaws.com", service, region); | |
_serviceHost = format("%s.%s.amazonaws.com", service, region); | |
} | |
/** | |
* Make request | |
* | |
* @param {string} method | |
* @param {string} path | |
* @param {string} queryString | |
* @param {table} headers | |
* @param {string} body | |
* @param {function} callback | |
* | |
* @return {null} | |
*/ | |
function request(method, path, queryString, headers, body, callback) { | |
// TODO: parse queryString properly from the URL | |
_updateTimestamps(); | |
// These headers are used in the request signature | |
headers["Host"] <- _serviceHost; | |
if (!("Content-Type" in headers)) headers["Content-Type"] <- "application/x-amz-json-1.0"; | |
// Add the signature to the headers | |
local signature = _getSignature(method, path, queryString, headers, body); | |
headers["Authorization"] <- format("%s Credential=%s/%s, SignedHeaders=%s, Signature=%s", | |
ALGORITHM, _accessKeyId, _getCredentialScope(), _signedHeaders, signature); | |
// This header is added *after* the request is signed | |
headers["X-Amz-Date"] <- _dateTime; | |
local url = _serviceUrl + path; | |
local retry; | |
retry = function(method, url, headers, body, callback, retries = 0) { | |
http.request(method, url, headers, body).sendasync(function(res) { | |
if (res.statuscode == 400) { | |
try { | |
local response = http.jsondecode(res.body); | |
if ("__type" in response) { | |
if (response.__type.find("#ProvisionedThroughputExceededException") != null) { | |
// Pushback from AWS, retry the whole thing in a bit | |
local delay = math.pow(2, retries) * 0.05; | |
// server.log("Pushback from AWS, retry after " + delay) | |
return imp.wakeup(delay, function() { | |
retry(method, url, headers, body, callback, retries+1); | |
}.bindenv(this)); | |
} | |
} | |
} catch (e) { | |
server.error(e); | |
} | |
} else if (res.statuscode == 429) { | |
// Pushback from Imp, retry the whole thing in a bit | |
local delay = 1.0; | |
// server.log("Pushback from Imp, retry after " + delay) | |
return imp.wakeup(delay, function() { | |
retry(method, url, headers, body, callback, retries); | |
}.bindenv(this)); | |
} | |
return callback(res); | |
}.bindenv(this)); | |
} | |
retry(method, url, headers, body, callback); | |
} | |
/** | |
* Shorthand for request('POST',...) | |
* | |
* @param {string} queryString | |
* @param {table} headers | |
* @param {string} body | |
* @param {function} callback | |
* @return {null} | |
*/ | |
function post(path, headers, body, callback) { | |
return request("POST", path, "", headers, body, callback); | |
} | |
/** | |
* Join array items into a string, separated by a delimiter. | |
* (i.e. the last part will not have a trailing delimiter) | |
* | |
* @param {array} parts | |
* @param {string} delimiter | |
* @return {string} | |
* @private | |
*/ | |
function _strJoin(parts, delimiter) { | |
local result = ""; | |
for (local i = 0; i < parts.len() - 1; i++) { | |
result += parts[i] + delimiter; | |
} | |
result += parts[parts.len() - 1]; | |
return result; | |
} | |
/** | |
* @param {blob} data | |
* @return {string} | |
* @private | |
*/ | |
function _blobToHexString(data) { | |
local str = ""; | |
foreach(byte in data) { | |
str += format("%02x", byte); | |
} | |
return str; | |
} | |
/** | |
* @return {null} | |
* @private | |
*/ | |
function _updateTimestamps() { | |
local date = date(); | |
_dateTime = format("%04d%02d%02dT%02d%02d%02dZ", | |
date.year, date.month + 1, date.day, | |
date.hour, date.min, date.sec); | |
_date = _dateTime.slice(0, 8); | |
} | |
/** | |
* @return {string} | |
* @private | |
*/ | |
function _getHashedCanonicalRequest(method, path, queryString, headerTable, payload) { | |
// Format headers according to AWS spec (lowercase, whitespace trimmed, alphabetical order, etc) | |
// TODO: extra spaces between non-quoted header values should be removed as well | |
local headerArray = []; | |
local signedHeaderArray = []; | |
foreach(key, val in headerTable) { | |
headerArray.push(key.tolower() + ":" + strip(val) + "\n"); | |
signedHeaderArray.push(key.tolower()); | |
} | |
headerArray.sort(); | |
signedHeaderArray.sort(); | |
local headers = _strJoin(headerArray, ""); | |
_signedHeaders = _strJoin(signedHeaderArray, ";"); | |
// Hash the payload and convert to a lowercase hex string | |
local payloadHash = _blobToHexString(http.hash.sha256(payload)); | |
// Create the canonical request and return a hex-encoded hash of it | |
local canonicalRequest = _strJoin([method, path, queryString, headers, _signedHeaders, payloadHash], "\n"); | |
return _blobToHexString(http.hash.sha256(canonicalRequest)); | |
} | |
/** | |
* @return {string} | |
* @private | |
*/ | |
function _getCredentialScope() { | |
return _date + format("/%s/%s/aws4_request", _region, _service); | |
} | |
/** | |
* @return {string} | |
* @private | |
*/ | |
function _deriveSigningKey() { | |
local kDate = http.hash.hmacsha256(_date, "AWS4" + _secretAccessKey); | |
local kRegion = http.hash.hmacsha256(_region, kDate); | |
local kService = http.hash.hmacsha256(_service, kRegion); | |
local kSigning = http.hash.hmacsha256("aws4_request", kService); | |
return kSigning; | |
} | |
/** | |
* Caninicalizes the request and creates the signature | |
* | |
* @param {string} method | |
* @param {string} path | |
* @param {string} queryString | |
* @param {array} headers | |
* @param {string} body | |
* @return {string} | |
* @private | |
*/ | |
function _getSignature(method, path, queryString, headers, body) { | |
// Get the bits and bobs we need to sign a request | |
local hashedCanonicalRequest = _getHashedCanonicalRequest(method, path, queryString, headers, body); | |
local stringToSign = _strJoin([ALGORITHM, _dateTime, _getCredentialScope(), hashedCanonicalRequest], "\n"); | |
local signingKey = _deriveSigningKey(); | |
// Return the signature | |
return _blobToHexString(http.hash.hmacsha256(stringToSign, signingKey)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function mstime() { | |
local d = date(); | |
return format("%d.%06d", d.time, d.usec); | |
} | |
function test_BatchWriteItem() { | |
local events = []; | |
for (local j = 0; j < 25; j++) { | |
local event = {}; | |
event.agentid <- split(http.agenturl(), "/").top(); | |
event.deviceid <- imp.configparams.deviceid; | |
event.timestamp <- mstime(); | |
events.push( event ); | |
} | |
i++; | |
server.log("BatchWriteItem: #" + i) | |
dynamodb.BatchWriteItem("testEvents", events, null, function(res) { | |
imp.wakeup(1, test_BatchWriteItem); | |
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body)); | |
}.bindenv(this)) | |
} | |
function test_PutItem() { | |
local obj = {}; | |
obj.agentid <- split(http.agenturl(), "/").top(); | |
obj.i <- { "i": i++ }; | |
server.log("PutItem: #" + i) | |
dynamodb.PutItem("testFridges", obj, function(res) { | |
imp.wakeup(0.25, test_PutItem); | |
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body)); | |
}.bindenv(this)) | |
} | |
function test_UpdateItem() { | |
local obj = {}; | |
obj.agentid <- split(http.agenturl(), "/").top(); | |
obj.deviceid <- imp.configparams.deviceid; | |
obj.updated <- time(); | |
obj.empty <- [ ]; | |
obj.teams <- [ "NHLAWS", "NBAABC", 1 ]; | |
obj.temps <- [ 1.2, 2.3, -3.4 ]; | |
obj.signedup <- true; | |
obj.tests <- [ true, false, true ]; | |
obj.maps <- { "a": 1, "b": 2 }; | |
obj.listsoflists <- [ [ [ [ [ 1, 2 ], 3], 4], 5], 6]; | |
obj.nulli <- null; | |
obj.i <- i++; | |
server.log("PutItem: #" + i) | |
dynamodb.PutItem("testFridges", obj, function(res) { | |
server.log("UpdateItem: #" + i) | |
dynamodb.UpdateItem("testFridges", { "agentid": obj.agentid }, { "signedup" : false }, function(res) { | |
imp.wakeup(1, test_UpdateItem); | |
if (res.statuscode != 200) server.log(format("%02d = %d: %s", i, res.statuscode, res.body)); | |
}.bindenv(this)) | |
}.bindenv(this)) | |
} | |
dynamodb <- AWSDynamoDB("us-west-2", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY); | |
i <- 1; | |
test_PutItem(); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment