|
|
|
import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; // ES6 import |
|
import { DynamoDBDocumentClient, TransactWriteCommand, QueryCommand, BatchGetCommand, UpdateCommand } from "@aws-sdk/lib-dynamodb"; // ES6 import |
|
|
|
const marshallOptions = { |
|
// Whether to automatically convert empty strings, blobs, and sets to `null`. |
|
convertEmptyValues: true, // false, by default. |
|
// Whether to remove undefined values while marshalling. |
|
removeUndefinedValues: true, // false, by default. |
|
// Whether to convert typeof object to map attribute. |
|
convertClassInstanceToMap: true, // false, by default. |
|
}; |
|
|
|
const unmarshallOptions = { |
|
// Whether to return numbers as a string instead of converting them to native JavaScript numbers. |
|
wrapNumbers: false, // false, by default. |
|
}; |
|
|
|
const translateConfig = { marshallOptions, unmarshallOptions }; |
|
|
|
|
|
// Bare-bones DynamoDB Clients |
|
const ddbClient = new DynamoDBClient({region: "ap-southeast-1"}); |
|
const ddbDocClient = DynamoDBDocumentClient.from(ddbClient, translateConfig); |
|
|
|
//////////// define reusable functions and constants |
|
|
|
function getDateParams() { |
|
const ttlDays = 60; |
|
const today = new Date(); |
|
const currentEpochSec = Math.floor(Date.now() / 1000); |
|
const startOfMonth = new Date(today.getUTCFullYear(), today.getUTCMonth(), 1); |
|
const skPrefix = startOfMonth.toISOString().slice(0, 7); |
|
const expiry = parseInt(startOfMonth.valueOf()/1000) + ttlDays * 24 * 60 * 60; |
|
return [skPrefix, currentEpochSec, expiry]; |
|
} |
|
|
|
const TABLE_NAME = "AccountCustomerQuotaCounters"; |
|
const DEFAULT_ACC_THRESHOLD = 300; |
|
const DEFAULT_USR_THRESHOLD = 2; |
|
|
|
|
|
///// transact write : check, increment if valid for both counters atomically |
|
|
|
async function checkAndIncrementCounts(token, accountId, userId) { |
|
const dateParams = getDateParams(); |
|
// must pass a client req token, |
|
// to allow for idempotent retries of each client action without false counts in 10 mins |
|
const params = getTransactParams(token, accountId, userId, dateParams); |
|
|
|
try { |
|
const response = await ddbDocClient.send(new TransactWriteCommand(params)); |
|
console.log(response); |
|
return "SUCCESS"; |
|
} catch(err) { |
|
console.log(err); |
|
if (isRejected(err)) { |
|
return "REJECT"; |
|
} |
|
return "RETRY"; |
|
} |
|
|
|
} |
|
|
|
function isRejected(err) { |
|
if (err.name === "IdempotentParameterMismatchException") { |
|
return true; |
|
} |
|
|
|
if (err.name === "TransactionCanceledException") { |
|
return err.CancellationReasons.some(it => it.Code === "ConditionalCheckFailed"); |
|
} |
|
return false; |
|
} |
|
|
|
function getTransactParams(token, accountId, userId, dateParams) { |
|
const accountRow = getTransactRowParams( |
|
accountId, userId, |
|
accountId, "ACCOUNT", DEFAULT_ACC_THRESHOLD, |
|
dateParams); |
|
|
|
const userRow = getTransactRowParams( |
|
accountId, userId, |
|
userId, "USER", DEFAULT_USR_THRESHOLD, |
|
dateParams); |
|
|
|
return { |
|
ClientRequestToken : token, |
|
TransactItems : [accountRow, userRow] |
|
}; |
|
} |
|
|
|
function getTransactRowParams(accountId, userId, sortKey, itemType, defaultThreshold, dateParams) { |
|
const [skPrefix, currentEpochSec, expiryEpochSec] = dateParams; |
|
return { |
|
Update: { |
|
TableName : TABLE_NAME, |
|
Key: { |
|
accountId : accountId, |
|
ym_id : `${skPrefix}_${sortKey}` |
|
}, |
|
ConditionExpression : "attribute_not_exists(accountId) or val < threshold", |
|
|
|
UpdateExpression : `SET val = if_not_exists(val, :zero) + :one, |
|
threshold = if_not_exists(threshold, :defaultThreshold), |
|
createdAtEpochSec = if_not_exists(createdAtEpochSec, :currentEpochSec), |
|
itemType = :itemType, |
|
itemId = :sortKey, |
|
lastModifiedFor = :userId, |
|
lastModifiedEpochSec = :currentEpochSec, |
|
expiryEpochSec = :expiryEpochSec`, |
|
|
|
ExpressionAttributeValues : { |
|
":defaultThreshold" : defaultThreshold, |
|
":itemType" : itemType, |
|
":sortKey" : sortKey, |
|
":userId" : userId, |
|
":currentEpochSec" : currentEpochSec, |
|
":expiryEpochSec" : expiryEpochSec, |
|
":zero" : 0, |
|
":one" : 1 |
|
}, |
|
ReturnValuesOnConditionCheckFailure : "ALL_OLD" |
|
} |
|
}; |
|
} |
|
|
|
|
|
///////// query |
|
|
|
|
|
async function getAllCounts(accountId, ymPrefix) { |
|
const items = []; |
|
let lastEvaluatedKey = null; |
|
const params = getQueryParams(accountId, ymPrefix); |
|
// TODO : consider adding limit based on page size |
|
|
|
do { |
|
// If there's a last evaluated key, add it to the params for the next query call |
|
if (lastEvaluatedKey) { |
|
params.ExclusiveStartKey = lastEvaluatedKey; |
|
} |
|
try { |
|
const response = await ddbDocClient.send(new QueryCommand(params)); |
|
console.log(response); |
|
|
|
items.push(...response.Items); |
|
// Update the last evaluated key for the next iteration |
|
lastEvaluatedKey = response.LastEvaluatedKey; |
|
} catch(err) { |
|
console.log(err); |
|
throw err; |
|
} |
|
|
|
} while(lastEvaluatedKey); |
|
|
|
return items; |
|
} |
|
|
|
|
|
function getQueryParams(accountId, skPrefix) { |
|
return { |
|
TableName : TABLE_NAME, |
|
KeyConditionExpression : "accountId = :pk AND begins_with(ym_id, :skPrefix)", |
|
ExpressionAttributeValues : { |
|
":pk" : accountId, |
|
":skPrefix" : skPrefix |
|
}, |
|
ProjectionExpression : "itemId, itemType, val, threshold, lastModifiedEpochSec" |
|
}; |
|
} |
|
|
|
/////// get |
|
|
|
async function getUserCount(accountId, userId, ymPrefix) { |
|
const params = getItemParams(accountId, userId, ymPrefix); |
|
try { |
|
const response = await ddbDocClient.send(new BatchGetCommand(params)); |
|
console.log(response); |
|
return (response.Responses[TABLE_NAME]); |
|
} catch(err) { |
|
console.log(err); |
|
throw err; |
|
} |
|
} |
|
|
|
function getItemParams(accountId, userId, skPrefix) { |
|
return { |
|
RequestItems : { |
|
[TABLE_NAME] : { |
|
ProjectionExpression : "itemId, itemType, val, threshold, lastModifiedEpochSec", |
|
Keys: [ |
|
{ |
|
accountId : accountId, |
|
ym_id : `${skPrefix}_${accountId}` |
|
}, |
|
{ |
|
accountId : accountId, |
|
ym_id : `${skPrefix}_${userId}` |
|
} |
|
] |
|
} |
|
} |
|
}; |
|
} |
|
|
|
////////// row updates |
|
|
|
|
|
async function upsertThreshold(accountId, userId, newThreshold, ymPrefix) { |
|
// if userId is null update the acc level threshold |
|
const sortKey = userId ? userId : accountId; |
|
const params = getUpdateParams(accountId, sortKey, ymPrefix, newThreshold); |
|
|
|
try { |
|
const response = await ddbDocClient.send(new UpdateCommand(params)); |
|
console.log(response); |
|
return (response.Attributes); |
|
} catch(err) { |
|
console.log(err); |
|
throw err; |
|
} |
|
} |
|
|
|
|
|
function getUpdateParams(accountId, sortKey, skPrefix, newThreshold) { |
|
// skip updating last-mod time as it is used to track counter val changes only |
|
return { |
|
TableName: TABLE_NAME, |
|
Key: { |
|
accountId : accountId, |
|
ym_id : `${skPrefix}_${sortKey}` |
|
}, |
|
UpdateExpression: 'SET threshold = :newThreshold, val = if_not_exists(val, :zero)', |
|
ExpressionAttributeValues: { |
|
':newThreshold': newThreshold, |
|
":zero" : 0 |
|
}, |
|
ReturnValues : "ALL_NEW" |
|
}; |
|
} |
|
|
|
//////// driver func for trials |
|
|
|
export const handler = async () => { |
|
|
|
|
|
console.log(await checkAndIncrementCounts("sfsdfd4","a1", "u2")); |
|
|
|
const users = [...Array(250).keys()].map(i => `u${i}`); |
|
const acc = "a15"; |
|
for(const usr of users) { |
|
const resp = await checkAndIncrementCounts(`${acc}_${usr}`, acc, usr); |
|
console.log(resp); |
|
} |
|
|
|
const results = await getAllCounts("a1", "2023-05"); |
|
results.forEach(it => console.log(it)); |
|
console.log(await getUserCount("a1", "u1", "2023-05")); |
|
|
|
console.log(await upsertThreshold("a1", null, 10, "2023-05")); |
|
console.log(await upsertThreshold("a1", "u501", 6, "2023-05")); |
|
console.log(await checkAndIncrementCounts("t2","a1", "u501")); |
|
|
|
console.log("done"); |
|
return ""; |
|
}; |
|
|
|
|