Last active
January 19, 2020 09:58
-
-
Save jg75/f5f7ac0986137243a0fa82d8ef2f1091 to your computer and use it in GitHub Desktop.
SWF Sample Workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a sample workflow that includes a manual task. | |
Activities: | |
1. get_contact_activity | |
Get a subscription address. | |
2. subscribe_topic_activity | |
Create an SNS topic and subscribe the provided endpoints to the topic. | |
3. wait_for_confirmation_activity | |
Wait for the user to confirm the subscription. | |
4. send_result_activity | |
If the user confirms, publish a congratulatory message to the topic. | |
""" | |
import logging | |
import uuid | |
import boto3 | |
from swf import register_swf_domain | |
from swf import register_swf_workflow | |
logging.basicConfig(level=logging.INFO) | |
class SampleWorkflow: | |
"""Sample Workflow.""" | |
logger = logging.getLogger(__name__) | |
client = boto3.client("swf") | |
default_activities = [{ | |
"name": "get_contact_activity", | |
"version": "v2" | |
}, { | |
"name": "subscribe_topic_activity", | |
"version": "v2" | |
}, { | |
"name": "wait_for_confirmation_activity", | |
"version": "v2" | |
}, { | |
"name": "send_result_activity", | |
"version": "v2" | |
}] | |
@classmethod | |
def schedule_next( | |
cls, | |
client, | |
activities, | |
task_token, | |
run_id, | |
**parameters | |
): | |
"""Request to schedule the next activity.""" | |
activity = activities.pop() | |
activity_id = f"{run_id}-{activity['name']}" | |
response = { | |
"taskToken": task_token, | |
"decisions": [{ | |
"decisionType": "ScheduleActivityTask", | |
"scheduleActivityTaskDecisionAttributes": { | |
"activityType": activity, | |
"activityId": activity_id | |
}, | |
**parameters | |
}] | |
} | |
client.respond_decision_task_completed(**response) | |
cls.logger.info("Scheduled:%s", activity_id) | |
@classmethod | |
def fail_workflow(cls, client, task_token, attributes): | |
"""Request to fail the workflow.""" | |
response = { | |
"taskToken": task_token, | |
"decisions": [{ | |
"decisionType": "FailWorkflowExecution", | |
"failWorkflowExecutionDecisionAttributes": { | |
"reason": attributes["reason"], | |
"details": attributes["details"] | |
} | |
}] | |
} | |
client.respond_decision_task_completed(**response) | |
cls.logger.error("Failed:%s", attributes) | |
@classmethod | |
def complete_workflow(cls, client, task_token, **parameters): | |
"""Request to complete the workflow.""" | |
response = { | |
"taskToken": task_token, | |
"completeWorkflowExecutionDecisionAttributes": { | |
**parameters | |
} | |
} | |
client.respond_decision_task_completed(**response) | |
cls.logger.info("Done") | |
def __init__( | |
self, | |
domain, | |
workflow, | |
version="v2", | |
identity=str(uuid.uuid1()), | |
activities=None, | |
task_list="sample", | |
client=None | |
): | |
"""Init.""" | |
if client: | |
self.client = client | |
self.identity = identity | |
self.task_list = task_list | |
self.activities = activities or self.default_activities | |
self.domain = register_swf_domain( | |
client=self.client, | |
name=domain, | |
description="Sample domain", | |
workflowExecutionRetentionPeriodInDays="10" | |
) | |
self.logger.info("Domain:%s", self.domain) | |
self.workflow = register_swf_workflow( | |
client=self.client, | |
domain=domain, | |
name=workflow, | |
version=version, | |
description="Sample workflow", | |
defaultChildPolicy="TERMINATE", | |
defaultTaskStartToCloseTimeout="3600", | |
defaultExecutionStartToCloseTimeout="86400", | |
defaultTaskList={"name": self.task_list} | |
) | |
self.logger.info("Workflow:%s", self.workflow) | |
def start(self): | |
"""Start workflow execution.""" | |
parameters = { | |
"domain": self.domain, | |
"workflowId": self.identity, | |
"workflowType": self.workflow | |
} | |
response = self.client.start_workflow_execution(**parameters) | |
run_id = response["runId"] | |
self.logger.info("Start:%s", run_id) | |
return run_id | |
def poll(self, run_id): | |
"""Poll for decisions.""" | |
paginator = self.client.get_paginator("poll_for_decision_task") | |
parameters = { | |
"domain": self.domain, | |
"taskList": {"name": self.task_list}, | |
"identity": self.identity | |
} | |
activities = [activity for activity in reversed(self.activities)] | |
for response in paginator.paginate(**parameters): | |
task_token = response["taskToken"] | |
for event in response["events"]: | |
event_type = event["eventType"] | |
self.logger.info("Event:%s", event_type) | |
if event_type == "WorkflowExecutionStarted": | |
self.schedule_next( | |
self.client, | |
activities, | |
task_token, | |
run_id | |
) | |
elif event_type == "ActivityTaskCompleted": | |
parameters["input"] = event.get("result") | |
self.schedule_next( | |
self.client, | |
activities, | |
task_token, | |
run_id, | |
**parameters | |
) | |
elif event_type == "ActivityTaskTimedOut": | |
attributes = event["activityTaskTimedOutEventAttributes"] | |
self.fail_workflow(self.client, task_token, attributes) | |
elif event_type == "ActivityTaskFailed": | |
attributes = event["activityTaskFailedEventAttributes"] | |
self.fail_workflow(self.client, task_token, attributes) | |
elif event_type == "WorkflowExecutionCompleted": | |
parameters["result"] = event.get("result") | |
self.complete_workflow( | |
self.client, | |
task_token, | |
**parameters | |
) | |
if __name__ == "__main__": | |
workflow = SampleWorkflow("jimtest", "sample") | |
run_id = workflow.start() | |
workflow.poll(run_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""AWS SWF domains stuff.""" | |
import argparse | |
import boto3 | |
import botocore | |
def get_swf_domains(client=boto3.client("swf"), **parameters): | |
"""Get AWS SWF domain names.""" | |
paginator = client.get_paginator("list_domains") | |
for response in paginator.paginate(**parameters): | |
for domain in response["domainInfos"]: | |
yield domain["name"] | |
def get_swf_workflows(client=boto3.client("swf"), **parameters): | |
"""Get AWS SWF workflows.""" | |
paginator = client.get_paginator("list_workflow_types") | |
for response in paginator.paginate(**parameters): | |
for workflow in response["typeInfos"]: | |
yield workflow["workflowType"] | |
def get_swf_activities(client=boto3.client("swf"), **parameters): | |
"""Get AWS SWF workflows.""" | |
paginator = client.get_paginator("list_activity_types") | |
for response in paginator.paginate(**parameters): | |
for activity in response["typeInfos"]: | |
yield activity["activityType"] | |
def register_swf_domain(client=boto3.client("swf"), **parameters): | |
"""Register AWS SWF domain. Return the domain name when it exists.""" | |
try: | |
client.register_domain(**parameters) | |
except botocore.exceptions.ClientError as e: | |
if e.response["Error"]["Code"] == "DomainAlreadyExistsFault": | |
pass | |
else: | |
raise | |
return parameters["name"] | |
def register_swf_workflow(client=boto3.client("swf"), **parameters): | |
"""Register AWS SWF workflow. Return the workflow id when it exists.""" | |
try: | |
client.register_workflow_type(**parameters) | |
except botocore.exceptions.ClientError as e: | |
if e.response["Error"]["Code"] == "TypeAlreadyExistsFault": | |
pass | |
else: | |
raise | |
return {"name": parameters["name"], "version": parameters["version"]} | |
def register_swf_activity(client=boto3.client("swf"), **parameters): | |
"""Register AWS SWF activity. Return the activity id when it exists.""" | |
try: | |
client.register_activity_type(**parameters) | |
except botocore.exceptions.ClientError as e: | |
if e.response["Error"]["Code"] == "TypeAlreadyExistsFault": | |
pass | |
else: | |
raise | |
return {"name": parameters["name"], "version": parameters["version"]} | |
def list_domains(client=boto3.client("swf"), **arguments): | |
"""Print AWS SWF domain names.""" | |
parameters = { | |
"registrationStatus": "REGISTERED", | |
"PaginationConfig": { | |
"MaxItems": arguments["max_items"], | |
"PageSize": arguments["page_size"] | |
} | |
} | |
for domain_name in get_swf_domains(client, **parameters): | |
print(domain_name) | |
def register_domain(client=boto3.client("swf"), **arguments): | |
"""Register an AWS SWF domain name.""" | |
parameters = { | |
"name": arguments["name"], | |
"description": arguments["description"], | |
"workflowExecutionRetentionPeriodInDays": arguments["retention_period"] | |
} | |
register_swf_domain(client, **parameters) | |
def list_workflows(client=boto3.client("swf"), **arguments): | |
"""Print AWS SWF workflows.""" | |
parameters = { | |
"domain": arguments["domain"], | |
"registrationStatus": arguments['registration_status'], | |
"PaginationConfig": { | |
"MaxItems": arguments["max_items"], | |
"PageSize": arguments["page_size"] | |
} | |
} | |
for workflow in get_swf_workflows(client, **parameters): | |
print(f"{workflow['name']}:{workflow['version']}") | |
def register_workflow(client=boto3.client("swf"), **arguments): | |
"""Register an AWS SWF workflow.""" | |
parameters = { | |
"domain": arguments["domain"], | |
"name": arguments["name"], | |
"version": arguments["version"], | |
"description": arguments["description"] | |
} | |
register_swf_workflow(client, **parameters) | |
def list_activities(client=boto3.client("swf"), **arguments): | |
"""Print AWS SWF activities.""" | |
parameters = { | |
"domain": arguments["domain"], | |
"registrationStatus": arguments['registration_status'], | |
"PaginationConfig": { | |
"MaxItems": arguments["max_items"], | |
"PageSize": arguments["page_size"] | |
} | |
} | |
for activity in get_swf_activities(client, **parameters): | |
print(activity) | |
def register_activity(client=boto3.client("swf"), **arguments): | |
"""Register an AWS SWF activity.""" | |
parameters = { | |
"domain": arguments["domain"], | |
"name": arguments["name"], | |
"version": arguments["version"], | |
"description": arguments["description"] | |
} | |
register_swf_activity(client, **parameters) | |
def parse_paginator_arguments(parser): | |
"""Add the paginator argument(s) to the argument parser.""" | |
parser.add_argument( | |
"--max-items", | |
help="Paginator max items", | |
type=int, | |
default=None | |
) | |
parser.add_argument( | |
"--page-size", | |
help="Paginator page size", | |
type=int, | |
default=None | |
) | |
def parse_default_arguments(parser): | |
"""Add the default argument(s) to the argument parser.""" | |
parser.add_argument( | |
"--region-name", | |
help='AWS region name', | |
default=None | |
) | |
def parse_list_domains(subparsers): | |
"""Add the list-domain command.""" | |
parser = subparsers.add_parser('list-domains') | |
parse_default_arguments(parser) | |
parse_paginator_arguments(parser) | |
parser.set_defaults(func=list_domains) | |
def parse_register_domain(subparsers): | |
"""Add the register-domain command.""" | |
parser = subparsers.add_parser('register-domain') | |
parser.set_defaults(func=register_domain) | |
parse_default_arguments(parser) | |
parser.add_argument( | |
"name", | |
help='AWS SWF domain name' | |
) | |
parser.add_argument( | |
"--description", | |
help='AWS SWF domain description', | |
default="" | |
) | |
parser.add_argument( | |
"--retention-period", | |
help='AWS SWF workflow execution retention period in days', | |
default="NONE" | |
) | |
def parse_list_workflows(subparsers): | |
"""Add the list-workflows command.""" | |
parser = subparsers.add_parser('list-workflows') | |
parse_default_arguments(parser) | |
parse_paginator_arguments(parser) | |
parser.add_argument( | |
"domain", | |
help='AWS SWF domain name' | |
) | |
parser.add_argument( | |
"--registration-status", | |
help='AWS SWF domain name', | |
choices=["REGISTERED", "DEPRECATED"], | |
default="REGISTERED" | |
) | |
parser.set_defaults(func=list_workflows) | |
def parse_register_workflow(subparsers): | |
"""Add the register-workflow command.""" | |
parser = subparsers.add_parser('register-workflow') | |
parser.set_defaults(func=register_workflow) | |
parse_default_arguments(parser) | |
parser.add_argument( | |
"domain", | |
help='AWS SWF domain name' | |
) | |
parser.add_argument( | |
"name", | |
help='AWS SWF workflow name' | |
) | |
parser.add_argument( | |
"version", | |
help='AWS SWF workflow version' | |
) | |
parser.add_argument( | |
"--description", | |
help='AWS SWF domain description', | |
default="" | |
) | |
def parse_list_activities(subparsers): | |
"""Add the list-workflows command.""" | |
parser = subparsers.add_parser('list-activities') | |
parse_default_arguments(parser) | |
parse_paginator_arguments(parser) | |
parser.add_argument( | |
"domain", | |
help='AWS SWF domain name' | |
) | |
parser.add_argument( | |
"--registration-status", | |
help='AWS SWF domain name', | |
choices=["REGISTERED", "DEPRECATED"], | |
default="REGISTERED" | |
) | |
parser.set_defaults(func=list_activities) | |
def parse_register_activity(subparsers): | |
"""Add the register-activity command.""" | |
parser = subparsers.add_parser('register-activity') | |
parser.set_defaults(func=register_activity) | |
parse_default_arguments(parser) | |
parser.add_argument( | |
"domain", | |
help='AWS SWF domain name' | |
) | |
parser.add_argument( | |
"name", | |
help='AWS SWF activity name' | |
) | |
parser.add_argument( | |
"version", | |
help='AWS SWF activity version' | |
) | |
parser.add_argument( | |
"--description", | |
help='AWS SWF domain description', | |
default="" | |
) | |
def parse_args(): | |
"""Parse input arguments.""" | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers() | |
subparsers.required = True | |
parse_list_domains(subparsers) | |
parse_register_domain(subparsers) | |
parse_list_workflows(subparsers) | |
parse_register_workflow(subparsers) | |
parse_list_activities(subparsers) | |
parse_register_activity(subparsers) | |
return vars(parser.parse_args()) | |
if __name__ == "__main__": | |
arguments = parse_args() | |
function = arguments.pop("func") | |
client = boto3.client("swf", region_name=arguments.pop("region_name")) | |
function(client, **arguments) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment