Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Last active May 8, 2023 10:05
Show Gist options
  • Save saswata-dutta/a472f8c64c2b1004eb7970984c554fa5 to your computer and use it in GitHub Desktop.
Save saswata-dutta/a472f8c64c2b1004eb7970984c554fa5 to your computer and use it in GitHub Desktop.
Implementing atomic hierarchical counters at account and user level using AWS DynamoDb

Table:

AccountCustomerQuotaCounters

Keys:

accountId : pk : S

ym_id : sk : S

// (overloaded) yyyy-mm_userId for user row, else yyyy-mm_accountId for acc row; having the date prefix allows to efficiently query current month usage accross all the account->users

Values:

// logic

val : N // current count

threshold : N

// meta

itemType : S // USER | ACCOUNT

itemId : S // userId | accountId

lastModifiedFor : S // userId causing the increment

createdAtEpochSec : N

lastModifiedEpochSec : N

expiryEpochSec : N //ttl, 60 days from current month day 1

Generating an idempotency token and gracefully handling user retries:

  1. create a unique hash from (query text, acc-id, usr-id)
  2. store output against "acc-id/usr-id/hash"
  3. on subsequent retry 1st lookup against the known path, only then try to make new api call
  4. Always pass this hash/token to DDb Transactions to let it dedup and not double count (only safe within 10 mins)
import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; // ES6 import
import { DynamoDBDocumentClient, TransactWriteCommand, QueryCommand, BatchGetCommand, UpdateCommand } from "@aws-sdk/lib-dynamodb"; // ES6 import
const marshallOptions = {
// Whether to automatically convert empty strings, blobs, and sets to `null`.
convertEmptyValues: true, // false, by default.
// Whether to remove undefined values while marshalling.
removeUndefinedValues: true, // false, by default.
// Whether to convert typeof object to map attribute.
convertClassInstanceToMap: true, // false, by default.
};
const unmarshallOptions = {
// Whether to return numbers as a string instead of converting them to native JavaScript numbers.
wrapNumbers: false, // false, by default.
};
const translateConfig = { marshallOptions, unmarshallOptions };
// Bare-bones DynamoDB Clients
const ddbClient = new DynamoDBClient({region: "ap-southeast-1"});
const ddbDocClient = DynamoDBDocumentClient.from(ddbClient, translateConfig);
//////////// define reusable functions and constants
function getDateParams() {
const ttlDays = 60;
const today = new Date();
const currentEpochSec = Math.floor(Date.now() / 1000);
const startOfMonth = new Date(today.getUTCFullYear(), today.getUTCMonth(), 1);
const skPrefix = startOfMonth.toISOString().slice(0, 7);
const expiry = parseInt(startOfMonth.valueOf()/1000) + ttlDays * 24 * 60 * 60;
return [skPrefix, currentEpochSec, expiry];
}
const TABLE_NAME = "AccountCustomerQuotaCounters";
const DEFAULT_ACC_THRESHOLD = 300;
const DEFAULT_USR_THRESHOLD = 2;
///// transact write : check, increment if valid for both counters atomically
async function checkAndIncrementCounts(token, accountId, userId) {
const dateParams = getDateParams();
// must pass a client req token,
// to allow for idempotent retries of each client action without false counts in 10 mins
const params = getTransactParams(token, accountId, userId, dateParams);
try {
const response = await ddbDocClient.send(new TransactWriteCommand(params));
console.log(response);
return "SUCCESS";
} catch(err) {
console.log(err);
if (isRejected(err)) {
return "REJECT";
}
return "RETRY";
}
}
function isRejected(err) {
if (err.name === "IdempotentParameterMismatchException") {
return true;
}
if (err.name === "TransactionCanceledException") {
return err.CancellationReasons.some(it => it.Code === "ConditionalCheckFailed");
}
return false;
}
function getTransactParams(token, accountId, userId, dateParams) {
const accountRow = getTransactRowParams(
accountId, userId,
accountId, "ACCOUNT", DEFAULT_ACC_THRESHOLD,
dateParams);
const userRow = getTransactRowParams(
accountId, userId,
userId, "USER", DEFAULT_USR_THRESHOLD,
dateParams);
return {
ClientRequestToken : token,
TransactItems : [accountRow, userRow]
};
}
function getTransactRowParams(accountId, userId, sortKey, itemType, defaultThreshold, dateParams) {
const [skPrefix, currentEpochSec, expiryEpochSec] = dateParams;
return {
Update: {
TableName : TABLE_NAME,
Key: {
accountId : accountId,
ym_id : `${skPrefix}_${sortKey}`
},
ConditionExpression : "attribute_not_exists(accountId) or val < threshold",
UpdateExpression : `SET val = if_not_exists(val, :zero) + :one,
threshold = if_not_exists(threshold, :defaultThreshold),
createdAtEpochSec = if_not_exists(createdAtEpochSec, :currentEpochSec),
itemType = :itemType,
itemId = :sortKey,
lastModifiedFor = :userId,
lastModifiedEpochSec = :currentEpochSec,
expiryEpochSec = :expiryEpochSec`,
ExpressionAttributeValues : {
":defaultThreshold" : defaultThreshold,
":itemType" : itemType,
":sortKey" : sortKey,
":userId" : userId,
":currentEpochSec" : currentEpochSec,
":expiryEpochSec" : expiryEpochSec,
":zero" : 0,
":one" : 1
},
ReturnValuesOnConditionCheckFailure : "ALL_OLD"
}
};
}
///////// query
async function getAllCounts(accountId, ymPrefix) {
const items = [];
let lastEvaluatedKey = null;
const params = getQueryParams(accountId, ymPrefix);
// TODO : consider adding limit based on page size
do {
// If there's a last evaluated key, add it to the params for the next query call
if (lastEvaluatedKey) {
params.ExclusiveStartKey = lastEvaluatedKey;
}
try {
const response = await ddbDocClient.send(new QueryCommand(params));
console.log(response);
items.push(...response.Items);
// Update the last evaluated key for the next iteration
lastEvaluatedKey = response.LastEvaluatedKey;
} catch(err) {
console.log(err);
throw err;
}
} while(lastEvaluatedKey);
return items;
}
function getQueryParams(accountId, skPrefix) {
return {
TableName : TABLE_NAME,
KeyConditionExpression : "accountId = :pk AND begins_with(ym_id, :skPrefix)",
ExpressionAttributeValues : {
":pk" : accountId,
":skPrefix" : skPrefix
},
ProjectionExpression : "itemId, itemType, val, threshold, lastModifiedEpochSec"
};
}
/////// get
async function getUserCount(accountId, userId, ymPrefix) {
const params = getItemParams(accountId, userId, ymPrefix);
try {
const response = await ddbDocClient.send(new BatchGetCommand(params));
console.log(response);
return (response.Responses[TABLE_NAME]);
} catch(err) {
console.log(err);
throw err;
}
}
function getItemParams(accountId, userId, skPrefix) {
return {
RequestItems : {
[TABLE_NAME] : {
ProjectionExpression : "itemId, itemType, val, threshold, lastModifiedEpochSec",
Keys: [
{
accountId : accountId,
ym_id : `${skPrefix}_${accountId}`
},
{
accountId : accountId,
ym_id : `${skPrefix}_${userId}`
}
]
}
}
};
}
////////// row updates
async function upsertThreshold(accountId, userId, newThreshold, ymPrefix) {
// if userId is null update the acc level threshold
const sortKey = userId ? userId : accountId;
const params = getUpdateParams(accountId, sortKey, ymPrefix, newThreshold);
try {
const response = await ddbDocClient.send(new UpdateCommand(params));
console.log(response);
return (response.Attributes);
} catch(err) {
console.log(err);
throw err;
}
}
function getUpdateParams(accountId, sortKey, skPrefix, newThreshold) {
// skip updating last-mod time as it is used to track counter val changes only
return {
TableName: TABLE_NAME,
Key: {
accountId : accountId,
ym_id : `${skPrefix}_${sortKey}`
},
UpdateExpression: 'SET threshold = :newThreshold, val = if_not_exists(val, :zero)',
ExpressionAttributeValues: {
':newThreshold': newThreshold,
":zero" : 0
},
ReturnValues : "ALL_NEW"
};
}
//////// driver func for trials
export const handler = async () => {
console.log(await checkAndIncrementCounts("sfsdfd4","a1", "u2"));
const users = [...Array(250).keys()].map(i => `u${i}`);
const acc = "a15";
for(const usr of users) {
const resp = await checkAndIncrementCounts(`${acc}_${usr}`, acc, usr);
console.log(resp);
}
const results = await getAllCounts("a1", "2023-05");
results.forEach(it => console.log(it));
console.log(await getUserCount("a1", "u1", "2023-05"));
console.log(await upsertThreshold("a1", null, 10, "2023-05"));
console.log(await upsertThreshold("a1", "u501", 6, "2023-05"));
console.log(await checkAndIncrementCounts("t2","a1", "u501"));
console.log("done");
return "";
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment