Skip to content

Instantly share code, notes, and snippets.

@jg75
Last active January 19, 2020 09:58
Show Gist options
  • Save jg75/f5f7ac0986137243a0fa82d8ef2f1091 to your computer and use it in GitHub Desktop.
Save jg75/f5f7ac0986137243a0fa82d8ef2f1091 to your computer and use it in GitHub Desktop.
SWF Sample Workflow
"""
This is a sample workflow that includes a manual task.
Activities:
1. get_contact_activity
Get a subscription address.
2. subscribe_topic_activity
Create an SNS topic and subscribe the provided endpoints to the topic.
3. wait_for_confirmation_activity
Wait for the user to confirm the subscription.
4. send_result_activity
If the user confirms, publish a congratulatory message to the topic.
"""
import logging
import uuid
import boto3
from swf import register_swf_domain
from swf import register_swf_workflow
logging.basicConfig(level=logging.INFO)
class SampleWorkflow:
"""Sample Workflow."""
logger = logging.getLogger(__name__)
client = boto3.client("swf")
default_activities = [{
"name": "get_contact_activity",
"version": "v2"
}, {
"name": "subscribe_topic_activity",
"version": "v2"
}, {
"name": "wait_for_confirmation_activity",
"version": "v2"
}, {
"name": "send_result_activity",
"version": "v2"
}]
@classmethod
def schedule_next(
cls,
client,
activities,
task_token,
run_id,
**parameters
):
"""Request to schedule the next activity."""
activity = activities.pop()
activity_id = f"{run_id}-{activity['name']}"
response = {
"taskToken": task_token,
"decisions": [{
"decisionType": "ScheduleActivityTask",
"scheduleActivityTaskDecisionAttributes": {
"activityType": activity,
"activityId": activity_id
},
**parameters
}]
}
client.respond_decision_task_completed(**response)
cls.logger.info("Scheduled:%s", activity_id)
@classmethod
def fail_workflow(cls, client, task_token, attributes):
"""Request to fail the workflow."""
response = {
"taskToken": task_token,
"decisions": [{
"decisionType": "FailWorkflowExecution",
"failWorkflowExecutionDecisionAttributes": {
"reason": attributes["reason"],
"details": attributes["details"]
}
}]
}
client.respond_decision_task_completed(**response)
cls.logger.error("Failed:%s", attributes)
@classmethod
def complete_workflow(cls, client, task_token, **parameters):
"""Request to complete the workflow."""
response = {
"taskToken": task_token,
"completeWorkflowExecutionDecisionAttributes": {
**parameters
}
}
client.respond_decision_task_completed(**response)
cls.logger.info("Done")
def __init__(
self,
domain,
workflow,
version="v2",
identity=str(uuid.uuid1()),
activities=None,
task_list="sample",
client=None
):
"""Init."""
if client:
self.client = client
self.identity = identity
self.task_list = task_list
self.activities = activities or self.default_activities
self.domain = register_swf_domain(
client=self.client,
name=domain,
description="Sample domain",
workflowExecutionRetentionPeriodInDays="10"
)
self.logger.info("Domain:%s", self.domain)
self.workflow = register_swf_workflow(
client=self.client,
domain=domain,
name=workflow,
version=version,
description="Sample workflow",
defaultChildPolicy="TERMINATE",
defaultTaskStartToCloseTimeout="3600",
defaultExecutionStartToCloseTimeout="86400",
defaultTaskList={"name": self.task_list}
)
self.logger.info("Workflow:%s", self.workflow)
def start(self):
"""Start workflow execution."""
parameters = {
"domain": self.domain,
"workflowId": self.identity,
"workflowType": self.workflow
}
response = self.client.start_workflow_execution(**parameters)
run_id = response["runId"]
self.logger.info("Start:%s", run_id)
return run_id
def poll(self, run_id):
"""Poll for decisions."""
paginator = self.client.get_paginator("poll_for_decision_task")
parameters = {
"domain": self.domain,
"taskList": {"name": self.task_list},
"identity": self.identity
}
activities = [activity for activity in reversed(self.activities)]
for response in paginator.paginate(**parameters):
task_token = response["taskToken"]
for event in response["events"]:
event_type = event["eventType"]
self.logger.info("Event:%s", event_type)
if event_type == "WorkflowExecutionStarted":
self.schedule_next(
self.client,
activities,
task_token,
run_id
)
elif event_type == "ActivityTaskCompleted":
parameters["input"] = event.get("result")
self.schedule_next(
self.client,
activities,
task_token,
run_id,
**parameters
)
elif event_type == "ActivityTaskTimedOut":
attributes = event["activityTaskTimedOutEventAttributes"]
self.fail_workflow(self.client, task_token, attributes)
elif event_type == "ActivityTaskFailed":
attributes = event["activityTaskFailedEventAttributes"]
self.fail_workflow(self.client, task_token, attributes)
elif event_type == "WorkflowExecutionCompleted":
parameters["result"] = event.get("result")
self.complete_workflow(
self.client,
task_token,
**parameters
)
if __name__ == "__main__":
workflow = SampleWorkflow("jimtest", "sample")
run_id = workflow.start()
workflow.poll(run_id)
"""AWS SWF domains stuff."""
import argparse
import boto3
import botocore
def get_swf_domains(client=boto3.client("swf"), **parameters):
"""Get AWS SWF domain names."""
paginator = client.get_paginator("list_domains")
for response in paginator.paginate(**parameters):
for domain in response["domainInfos"]:
yield domain["name"]
def get_swf_workflows(client=boto3.client("swf"), **parameters):
"""Get AWS SWF workflows."""
paginator = client.get_paginator("list_workflow_types")
for response in paginator.paginate(**parameters):
for workflow in response["typeInfos"]:
yield workflow["workflowType"]
def get_swf_activities(client=boto3.client("swf"), **parameters):
"""Get AWS SWF workflows."""
paginator = client.get_paginator("list_activity_types")
for response in paginator.paginate(**parameters):
for activity in response["typeInfos"]:
yield activity["activityType"]
def register_swf_domain(client=boto3.client("swf"), **parameters):
"""Register AWS SWF domain. Return the domain name when it exists."""
try:
client.register_domain(**parameters)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "DomainAlreadyExistsFault":
pass
else:
raise
return parameters["name"]
def register_swf_workflow(client=boto3.client("swf"), **parameters):
"""Register AWS SWF workflow. Return the workflow id when it exists."""
try:
client.register_workflow_type(**parameters)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "TypeAlreadyExistsFault":
pass
else:
raise
return {"name": parameters["name"], "version": parameters["version"]}
def register_swf_activity(client=boto3.client("swf"), **parameters):
"""Register AWS SWF activity. Return the activity id when it exists."""
try:
client.register_activity_type(**parameters)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "TypeAlreadyExistsFault":
pass
else:
raise
return {"name": parameters["name"], "version": parameters["version"]}
def list_domains(client=boto3.client("swf"), **arguments):
"""Print AWS SWF domain names."""
parameters = {
"registrationStatus": "REGISTERED",
"PaginationConfig": {
"MaxItems": arguments["max_items"],
"PageSize": arguments["page_size"]
}
}
for domain_name in get_swf_domains(client, **parameters):
print(domain_name)
def register_domain(client=boto3.client("swf"), **arguments):
"""Register an AWS SWF domain name."""
parameters = {
"name": arguments["name"],
"description": arguments["description"],
"workflowExecutionRetentionPeriodInDays": arguments["retention_period"]
}
register_swf_domain(client, **parameters)
def list_workflows(client=boto3.client("swf"), **arguments):
"""Print AWS SWF workflows."""
parameters = {
"domain": arguments["domain"],
"registrationStatus": arguments['registration_status'],
"PaginationConfig": {
"MaxItems": arguments["max_items"],
"PageSize": arguments["page_size"]
}
}
for workflow in get_swf_workflows(client, **parameters):
print(f"{workflow['name']}:{workflow['version']}")
def register_workflow(client=boto3.client("swf"), **arguments):
"""Register an AWS SWF workflow."""
parameters = {
"domain": arguments["domain"],
"name": arguments["name"],
"version": arguments["version"],
"description": arguments["description"]
}
register_swf_workflow(client, **parameters)
def list_activities(client=boto3.client("swf"), **arguments):
"""Print AWS SWF activities."""
parameters = {
"domain": arguments["domain"],
"registrationStatus": arguments['registration_status'],
"PaginationConfig": {
"MaxItems": arguments["max_items"],
"PageSize": arguments["page_size"]
}
}
for activity in get_swf_activities(client, **parameters):
print(activity)
def register_activity(client=boto3.client("swf"), **arguments):
"""Register an AWS SWF activity."""
parameters = {
"domain": arguments["domain"],
"name": arguments["name"],
"version": arguments["version"],
"description": arguments["description"]
}
register_swf_activity(client, **parameters)
def parse_paginator_arguments(parser):
"""Add the paginator argument(s) to the argument parser."""
parser.add_argument(
"--max-items",
help="Paginator max items",
type=int,
default=None
)
parser.add_argument(
"--page-size",
help="Paginator page size",
type=int,
default=None
)
def parse_default_arguments(parser):
"""Add the default argument(s) to the argument parser."""
parser.add_argument(
"--region-name",
help='AWS region name',
default=None
)
def parse_list_domains(subparsers):
"""Add the list-domain command."""
parser = subparsers.add_parser('list-domains')
parse_default_arguments(parser)
parse_paginator_arguments(parser)
parser.set_defaults(func=list_domains)
def parse_register_domain(subparsers):
"""Add the register-domain command."""
parser = subparsers.add_parser('register-domain')
parser.set_defaults(func=register_domain)
parse_default_arguments(parser)
parser.add_argument(
"name",
help='AWS SWF domain name'
)
parser.add_argument(
"--description",
help='AWS SWF domain description',
default=""
)
parser.add_argument(
"--retention-period",
help='AWS SWF workflow execution retention period in days',
default="NONE"
)
def parse_list_workflows(subparsers):
"""Add the list-workflows command."""
parser = subparsers.add_parser('list-workflows')
parse_default_arguments(parser)
parse_paginator_arguments(parser)
parser.add_argument(
"domain",
help='AWS SWF domain name'
)
parser.add_argument(
"--registration-status",
help='AWS SWF domain name',
choices=["REGISTERED", "DEPRECATED"],
default="REGISTERED"
)
parser.set_defaults(func=list_workflows)
def parse_register_workflow(subparsers):
"""Add the register-workflow command."""
parser = subparsers.add_parser('register-workflow')
parser.set_defaults(func=register_workflow)
parse_default_arguments(parser)
parser.add_argument(
"domain",
help='AWS SWF domain name'
)
parser.add_argument(
"name",
help='AWS SWF workflow name'
)
parser.add_argument(
"version",
help='AWS SWF workflow version'
)
parser.add_argument(
"--description",
help='AWS SWF domain description',
default=""
)
def parse_list_activities(subparsers):
"""Add the list-workflows command."""
parser = subparsers.add_parser('list-activities')
parse_default_arguments(parser)
parse_paginator_arguments(parser)
parser.add_argument(
"domain",
help='AWS SWF domain name'
)
parser.add_argument(
"--registration-status",
help='AWS SWF domain name',
choices=["REGISTERED", "DEPRECATED"],
default="REGISTERED"
)
parser.set_defaults(func=list_activities)
def parse_register_activity(subparsers):
"""Add the register-activity command."""
parser = subparsers.add_parser('register-activity')
parser.set_defaults(func=register_activity)
parse_default_arguments(parser)
parser.add_argument(
"domain",
help='AWS SWF domain name'
)
parser.add_argument(
"name",
help='AWS SWF activity name'
)
parser.add_argument(
"version",
help='AWS SWF activity version'
)
parser.add_argument(
"--description",
help='AWS SWF domain description',
default=""
)
def parse_args():
"""Parse input arguments."""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
subparsers.required = True
parse_list_domains(subparsers)
parse_register_domain(subparsers)
parse_list_workflows(subparsers)
parse_register_workflow(subparsers)
parse_list_activities(subparsers)
parse_register_activity(subparsers)
return vars(parser.parse_args())
if __name__ == "__main__":
arguments = parse_args()
function = arguments.pop("func")
client = boto3.client("swf", region_name=arguments.pop("region_name"))
function(client, **arguments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment