Skip to content

Instantly share code, notes, and snippets.

@MattHodge
Last active October 20, 2023 03:35
Show Gist options
  • Save MattHodge/cd84107065dbef23076cd7c31d6cc705 to your computer and use it in GitHub Desktop.
Save MattHodge/cd84107065dbef23076cd7c31d6cc705 to your computer and use it in GitHub Desktop.
argo-workflow-design-patterns
# From https://github.com/argoproj/argo-workflows/blob/master/examples/expression-destructure-json.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: expression-destructure-json-
annotations:
workflows.argoproj.io/version: ">= 3.1.0"
spec:
arguments:
parameters:
- name: config
value: '{"a": "1", "b": "2", "c": "3"}'
entrypoint: main
templates:
- name: main
inputs:
parameters:
- name: a
value: "{{=jsonpath(workflow.parameters.config, '$.a')}}"
- name: b
value: "{{=jsonpath(workflow.parameters.config, '$.b')}}"
- name: c
value: "{{=jsonpath(workflow.parameters.config, '$.c')}}"
script:
env:
- name: A
value: "{{inputs.parameters.a}}"
- name: B
value: "{{inputs.parameters.b}}"
- name: C
value: "{{inputs.parameters.c}}"
image: debian:9.4
command: [bash]
source: |
echo "$A$B$C"
---
# This WorkflowTemplate enumerates all of our databases based on the environment
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: get-dbs
spec:
templates:
- name: get-dbs
inputs:
parameters:
- name: environment
default: "dev"
enum: ["dev", "prod"]
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.json
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Depending on environment, return an array of databases
import json
import os
if "{{inputs.parameters.environment}}" == "dev":
dbs = ["a.dev.foo.bar", "b.dev.foo.bar", "c.dev.foo.bar"]
elif "{{inputs.parameters.environment}}" == "prod":
dbs = ["a.prod.foo.bar", "b.prod.foo.bar", "c.prod.foo.bar"]
else:
print("Invalid environment")
os.exit(1)
with open("/tmp/result.json", "w") as f:
json.dump(dbs, f)
---
# This WorkflowTemplate uses the get-dbs template, and then spins up a child workflow for each using
# the Workflow of Workflows with Semaphore pattern
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: db-auditing
spec:
templates:
- name: get-row-count-for-all-dbs
inputs:
parameters:
- name: environment
enum: ["dev", "prod"]
dag:
tasks:
# Get a list of all the dbs
- name: get-dbs
templateRef:
name: get-dbs
template: get-dbs
arguments:
parameters:
- name: environment
value: "{{inputs.parameters.environment}}"
# Get the row count for each table in each db
- name: get-row-count
depends: get-dbs
template: get-table-row-count-as-child-wfs
arguments:
parameters:
- name: db-host
value: "{{item}}"
- name: db-name
value: "main"
- name: table-name
value: "fruits"
withParam: "{{tasks.get-dbs.outputs.parameters.result}}"
# Template for using the workflow in workflow pattern to run a child workflow for each item in the list
- name: get-table-row-count-as-child-wfs
inputs:
parameters:
- name: db-host
- name: db-name
- name: table-name
synchronization:
semaphore:
configMapKeyRef:
name: wow-semaphore
key: wow
resource:
action: create
manifest: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: get-row-count-{{inputs.parameters.db-host}}-{{inputs.parameters.db-name}}-{{inputs.parameters.table-name}}-
spec:
entrypoint: get-row-count-and-log
arguments:
parameters:
- name: db-host
value: "{{inputs.parameters.db-host}}"
- name: db-name
value: "{{inputs.parameters.db-name}}"
- name: table-name
value: "{{inputs.parameters.table-name}}"
workflowTemplateRef:
name: db-auditing
successCondition: status.phase == Succeeded
failureCondition: status.phase in (Failed, Error)
# Template to execute in the child workflow
- name: get-row-count-and-log
inputs:
parameters:
- name: db-host
- name: db-name
- name: table-name
dag:
tasks:
- name: get-row-count
template: get-table-row-count
arguments:
parameters:
- name: db-host
value: "{{inputs.parameters.db-host}}"
- name: db-name
value: "{{inputs.parameters.db-name}}"
- name: table-name
value: "{{inputs.parameters.table-name}}"
- name: send-log-event
depends: get-row-count
template: send-log-event
arguments:
parameters:
- name: message
value: "Row count for {{inputs.parameters.db-host}} - {{inputs.parameters.db-name}} - {{inputs.parameters.table-name}} is {{tasks.get-row-count.outputs.parameters.rows}}"
- name: send-log-event
inputs:
parameters:
- name: message
script:
image: "python:3.12-alpine"
command:
- python
source: |
print("Sending log event: " + "{{inputs.parameters.message}}")
- name: get-table-row-count
inputs:
parameters:
- name: db-host
- name: db-name
- name: table-name
outputs:
parameters:
- name: rows
valueFrom:
path: /tmp/result.txt
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Generate a random row count
import random
randomInt = random.randint(100, 10000)
file = open("/tmp/result.txt", "w")
file.write(str(randomInt))
file.close()
---
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: get-dbs
spec:
templates:
- name: get-dbs
inputs:
parameters:
- name: environment
default: "dev"
enum: ["dev", "prod"]
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.json
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Depending on environment, return an array of databases
import json
import os
if "{{inputs.parameters.environment}}" == "dev":
dbs = ["a.dev.foo.bar", "b.dev.foo.bar", "c.dev.foo.bar"]
elif "{{inputs.parameters.environment}}" == "prod":
dbs = ["a.prod.foo.bar", "b.prod.foo.bar", "c.prod.foo.bar"]
else:
print("Invalid environment")
os.exit(1)
with open("/tmp/result.json", "w") as f:
json.dump(dbs, f)
- name: for-each-db
inputs:
parameters:
- name: environment
default: "dev"
enum: ["dev", "prod"]
- name: workflow_template_ref
- name: entrypoint
- name: semaphore_configmap_name
- name: semaphore_configmap_key
dag:
tasks:
- name: get-dbs
template: get-dbs
arguments:
parameters:
- name: environment
value: "{{inputs.parameters.environment}}"
- name: for-each-db
depends: get-dbs
template: zz-internal-child-workflow
arguments:
parameters:
- name: db-host
value: "{{item}}"
- name: environment
value: "{{inputs.parameters.environment}}"
- name: workflow_template_ref
value: "{{inputs.parameters.workflow_template_ref}}"
- name: entrypoint
value: "{{inputs.parameters.entrypoint}}"
- name: semaphore_configmap_name
value: "{{inputs.parameters.semaphore_configmap_name}}"
- name: semaphore_configmap_key
value: "{{inputs.parameters.semaphore_configmap_key}}"
withParam: "{{tasks.get-dbs.outputs.parameters.result}}"
- name: zz-internal-child-workflow
inputs:
parameters:
- name: db-host
- name: environment
- name: workflow_template_ref
- name: entrypoint
- name: semaphore_configmap_name
- name: semaphore_configmap_key
resource:
action: create
manifest: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
# Tidy up the names to make them safe for generating child workflow names
generateName: child-{{inputs.parameters.environment}}--{{inputs.parameters.db-host}}-
spec:
entrypoint: "{{inputs.parameters.entrypoint}}"
arguments:
parameters:
- name: db-host
value: "{{inputs.parameters.db-host}}"
- name: environment
value: "{{inputs.parameters.environment}}"
workflowTemplateRef:
name: "{{inputs.parameters.workflow_template_ref}}"
successCondition: status.phase == Succeeded
failureCondition: status.phase in (Failed, Error)
synchronization:
semaphore:
configMapKeyRef:
name: "{{inputs.parameters.semaphore_configmap_name}}"
key: "{{inputs.parameters.semaphore_configmap_key}}"
---
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: db-auditing
spec:
templates:
- name: main
inputs:
parameters:
- name: environment
default: "dev"
enum: ["dev", "prod"]
steps:
- - name: for-each-db
templateRef:
name: get-dbs
template: for-each-db
arguments:
parameters:
- name: environment
value: "dev"
- name: workflow_template_ref
value: "db-auditing" # This workflow template
- name: entrypoint
value: "get-row-count-and-log"
- name: semaphore_configmap_name
value: wow-semaphore
- name: semaphore_configmap_key
value: wow
- name: get-row-count-and-log
inputs:
parameters:
- name: db-host
- name: environment
- name: db-name
value: "main"
- name: table-name
value: "fruits"
steps:
- - name: get-row-count
template: get-table-row-count
arguments:
parameters:
- name: db-host
value: "{{inputs.parameters.db-host}}"
- name: db-name
value: "{{inputs.parameters.db-name}}"
- name: table-name
value: "{{inputs.parameters.table-name}}"
- - name: send-log
template: send-log-event
arguments:
parameters:
- name: message
value: "Row count for {{inputs.parameters.db-host}} - {{inputs.parameters.db-name}} - {{inputs.parameters.table-name}} is {{steps.get-row-count.outputs.parameters.rows}}"
- name: send-log-event
inputs:
parameters:
- name: message
script:
image: "python:3.12-alpine"
command:
- python
source: |
print("Sending log event: " + "{{inputs.parameters.message}}")
- name: get-table-row-count
inputs:
parameters:
- name: db-host
- name: db-name
- name: table-name
outputs:
parameters:
- name: rows
valueFrom:
path: /tmp/result.txt
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Generate a random row count
import random
randomInt = random.randint(100, 10000)
file = open("/tmp/result.txt", "w")
file.write(str(randomInt))
file.close()
---
# This ConfigMap is used by the workflow to limit concurrent execution
apiVersion: v1
kind: ConfigMap
metadata:
name: wow-semaphore
data:
wow: "1"
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-of-workflows-
spec:
entrypoint: main
arguments:
parameters:
- name: json-list
value: '["a", "b", "c"]'
templates:
- name: main
steps:
- - name: process-item
template: process-json-item
arguments:
parameters:
- name: item
value: "{{item}}"
withParam: "{{workflow.parameters.json-list}}"
- name: process-json-item
inputs:
parameters:
- name: item
# Usage of the ConfigMap specified above
synchronization:
semaphore:
configMapKeyRef:
name: wow-semaphore
key: wow
resource:
action: create
# This submits a child workflow, which will run as its own independent workflow in Argo.
manifest: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: process-item-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.item}}"]
successCondition: status.phase == Succeeded
failureCondition: status.phase in (Failed, Error)
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: job-dep-example
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: run-job
template: run-job
# This template runs a theoretical job which produces a JSON output
- name: run-job
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.json
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Assume we can't control this JSON blob
jsonResult = """{
"name": "Job 345",
"status": "Failed",
"failure_reason": "Failed to connect to database",
"duration_seconds": 346,
"uri": "https://example.com/job/356",
"started_at": "2021-05-11T01:00:00Z",
"finished_at": "2021-05-11T05:00:00Z",
"triggered_by": "[email protected]"
}"""
file = open("/tmp/result.json", "w")
file.write(jsonResult)
file.close()
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: job-dep-example
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: run-job
template: run-job
# This is our `run-job` from before, we have just changed its name to make it clear
# that it is internal to the workflow, and should not be used directly.
- name: zz-internal-run-job
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/result.json
script:
image: "python:3.12-alpine"
command:
- python
source: |
# Assume we can't control this JSON blob
jsonResult = """{
"name": "Job 345",
"status": "Failed",
"failure_reason": "Failed to connect to database",
"duration_seconds": 346,
"uri": "https://example.com/job/356",
"started_at": "2021-05-11T01:00:00Z",
"finished_at": "2021-05-11T05:00:00Z",
"triggered_by": "[email protected]"
}"""
file = open("/tmp/result.json", "w")
file.write(jsonResult)
file.close()
# This is our new `run-job` template, which uses the internal `zz-internal-run-job`
# It is here that we create the facade that we want to expose as the interface to our users.
#
# Teams can now use this template, and get the result of the job as outputs.
- name: run-job
steps:
- - name: run-job
template: zz-internal-run-job
outputs:
parameters:
- name: name
valueFrom:
expression: "jsonpath(steps['run-job'].outputs.parameters.result, '$.name')"
- name: status
valueFrom:
expression: "jsonpath(steps['run-job'].outputs.parameters.result, '$.status')"
- name: failure_reason
valueFrom:
expression: "jsonpath(steps['run-job'].outputs.parameters.result, '$.failure_reason')"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment