Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Last active July 8, 2023 12:35
Show Gist options
  • Save saswata-dutta/fbbe2fefd6c07d587e9c8c31cfa3f392 to your computer and use it in GitHub Desktop.
Save saswata-dutta/fbbe2fefd6c07d587e9c8c31cfa3f392 to your computer and use it in GitHub Desktop.
EventBridge Scheduler POC to perform tasks in future

https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule-schedule.html

  1. on each job creation event create a “one time” event bridge schedule event

    1. name it using the job id so that its easy to delete later
    2. set the time to be what we expect the overall sla breach to be in future: say now() + 3 days
    3. need to set the flex window aptly (1~15 mins)
  2. let the event bridge target be sqs (so that we can buffer and consume in a controlled rate)

    1. the sqs payload can contain the job-id,
    2. let this sqs be consumed by a lambda that does 2 things
      1. ensure deletion of the event-bridge schedule (if exists) : using the job-id in payload to derive the schedule name
        1. to clear up quota per aws acc
        2. catch and ignore ResourceNotFoundException while deleting to ignore sqs retries
      2. used the job-id to lookup details from tracker-db and forward it the action Q/notification downstream
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "scheduler.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
aws iam create-role --role-name SchedulerExecutionRole \
--assume-role-policy-document file://role.json
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sqs:SendMessage"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
aws iam create-policy --policy-name SQSSendMessagePermissions \
--policy-document file://policy.json
aws iam attach-role-policy \
--policy-arn "arn:aws:iam::XXXX:policy/SQSSendMessagePermissions" \
--role-name SchedulerExecutionRole
from datetime import datetime, timezone, timedelta
import boto3
from multiprocessing import Pool
scheduler = boto3.client("scheduler", region_name="us-east-1")
roleArn = "arn:aws:iam::123456???:role/SchedulerExecutionRole"
sqsArn = "arn:aws:sqs:us-east-1:123456???:test-q"
flex_window = {"MaximumWindowInMinutes": 15, "Mode": "FLEXIBLE"}
# set this only if we want to spread out the requests to prevent downstream throttles
sqs_templated = {"RoleArn": roleArn, "Arn": sqsArn}
now = datetime.now(timezone.utc)
eta = now + timedelta(hours=1)
at = f"at({eta.isoformat(timespec='seconds').split('+')[0]})"
# might want to round this to nearest min/5min for better lambda batching (epoch//300) * 300
def task(chunk):
start, stop = chunk
for i in range(start, stop):
# must be '[0-9a-zA-Z-_.]+'
jobId = f"jobId___{i}"
sqs_templated["Input"] = jobId # any string, preferably a json here which would appear in sqs messg body
# https://docs.aws.amazon.com/scheduler/latest/APIReference/API_Target.html#scheduler-Type-Target-Input
try:
scheduler.create_schedule(
Name=jobId,
ScheduleExpression=at,
Target=sqs_templated,
FlexibleTimeWindow=flex_window,
)
except Exception as err:
print(err)
print(f"{jobId}")
if __name__ == "__main__":
num_workers = 10
count = 100_000
step = count // num_workers
chunks = [(i, i + step) for i in range(0, count, step)]
with Pool(num_workers) as p:
p.map(task, chunks)
jobId = f"jobId___{i}"
scheduler.delete_schedule(Name=jobId)
aws scheduler create-schedule \
--group-name 'default' \
--name 'sas_sched_1' \
--schedule-expression 'at(2023-07-08T12:50:00)' \
--schedule-expression-timezone 'Asia/Kolkata' \
--flexible-time-window '{"Mode": "OFF"}' \
--description '{"some" : "json", "values" : [1, 2, "abc", {"x" : "y"}, ["a", "b"]]}' \
--target '{"RoleArn": "arn:aws:iam::123123123123:role/SchedulerExecutionRole",
"Arn": "arn:aws:sqs:ap-south-1:123123123123:MySchedulerQueue",
"Input": "{\"GroupName\": \"default\", \"Name\": \"sas_sched_1\", \"data\": {\"accountId\": 1234, \"notificationType\": \"whatsapp\", \"payload\": {\"messaging_product\": \"whatsapp\", \"template\": {\"components\": [{\"parameters\": [{\"text\": \"sas testing task\", \"type\": \"text\"}, {\"text\": \"sas test due dt\", \"type\": \"text\"}], \"type\": \"body\"}], \"language\": {\"code\": \"en_US\"}, \"name\": \"my_template\"}, \"to\": \"911234567890\", \"type\": \"template\"}}}"
}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment