This guide centralizes concepts needed to run a "robust" production data platform using Dagster Cloud, where robust means assets and infrastructure are:
- Fault Tolerant via replication, resource constraints, retries, parallelization, and run queues and priorities
- Observable via customizable logs and useful alerts
This guide does not cover every aspect of a production data platform. Other useful resources include:
- Testing and CICD to ensure new code does what is expected without breaking existing assets
- Project Structure to build a code base that can scale across teams and dependencies [Todo: Link to guide]
- Data Expectations to ensure the data flowing through your pipelines is valid and meets your expectations [Todo: refresh guide for assets, add section on conditional behavior]
- Least Privilege to ensure access control is appropriately scoped across teams [Todo: Link to guide]
-
Where settings are applied
- Global defaults
- Deployment settings
- Agent settings
- Project (code location) settings
- Tags
- Job tags
- Asset reconciliation with tags
- Tag over-rides
- Asset or op settings
- Setting precedence
- Global defaults
-
Fault tolerant retries
- Global default: run retries
- Job tags: run retries
- Asset or op settings: retry policies
- Reconciliation behavior
-
Fault tolerant resource limits
- ECS
- Kubernetes
-
Fault tolerant run queues and priorities TODO
-
Fault tolerant parallelism TODO
- Runs with concurrency limits
- Runs using backfills
- Ops with run co-ordinators
- Ops using dynamic outputs
- Within op custom code
-
Observable logs TODO
- Log forwarding
- Log customization
-
Observable alerts
- Global defaults
- Global defaults and tags
- Alerting on SLA violations
- Custom alerts
- Dagster infrastructure health
-
Custom code
Dagster Cloud makes it easy to build a robust system by default while giving individual components the flexibility to customize their behavior.
Many settings such as run retries or alerts can be set globally for an entire Dagster Cloud deployment. These gloal defaults are applied in a varitey of locations.
Deployment settings
Deployment settings are set through the Dagster Cloud web interface:
- Select the deployment drop down and click on the settings icon
- Modify the settings
Agent settings
Settings that are specific to the agent will be set in the tool used to managed the agent:
- Kubernetes agent settings are specificed in the helm chart values.yaml
- ECS agent settings are specified in the cloud formation template
Relevant Agent Settings:
- Kubernetes settings
envSecrets
,envVars
, andenvConfigMaps
volumes
andvolumeMounts
labels
resources
pythonLogs
computeLogs
agentReplicas
log_group
: Cloud Watch Log Forwarding
Code location settings
Settings that are specific to a certain code location (project) are set through the dagster_cloud.yaml
file included with your code.
Code location settings:
container_context:
k8s:
env_config_maps:
- my_config_map
env_secrets:
- my_secret
env_vars:
- FOO_ENV_VAR=foo_value
- BAR_ENV_VAR
image_pull_policy: Always
image_pull_secrets:
- name: my_image_pull_secret
labels:
my_label_key: my_label_value
namespace: my_k8s_namespace
service_account_name: my_service_account_name
volume_mounts:
- mount_path: /opt/dagster/test_mount_path/volume_mounted_file.yaml
name: test-volume
sub_path: volume_mounted_file.yaml
volumes:
- name: test-volume
config_map:
name: test-volume-configmap
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
env_vars
secrets
secrets_tags
Many Dagster settings are controlled by tags. Tags are set in code but tag values can be over-ridden in the Dagster Launchpad.
Tags and jobs
Tags can be applied to jobs created with the job decorator or the to_job
or build_job
functions:
# Tags on jobs from graphs
@op
def my_op:
pass
@graph
def my_graph:
my_op()
job_from_graph = my_graph.to_job(
tags = {"alert_team": "all"}
)
# Tags on @job
@job(
tags = {"alert_team": "data_engineers"}
)
def job_from_decorator():
my_op()
# Tags on asset jobs
asset_job = build_asset_job(
name = "asset_job",
selection = AssetSelection.all(),
tags = {"dagster/max_retries": 1}
)
Tags and asset reconciliation
Assets managed by an asset reconciliation sensor are updated by running an implicit asset job. That implicit asset job can have run tags via the run_tags
argument to build_asset_reconciliation_sensor
:
reconciler = build_asset_reconciliation_sensor(
name = "reconciler",
asset_selection = AssetSelection.all()
run_tags={"dagster/max_retries": 2, "alert_team": "data_engineers}
)
Tag over-rides
Tag values can be over-ridden at run time through the Python CLI or the Dagit UI. For example, if you have a job:
analytics_job = define_asset_job(
name = "analytics_job",
selection=AssetSelection.keys(["ANALYTICS", "daily_order_summary"]).upstream(),
tags = {"dagster/max_retries": 1}
)
In the Launchpad, over-ride the tag on the Launchpad by selecting "Edit Tags"
Some settings are applied on specific assets or ops. For example:
from dagster import RetryPolicy, Backoff, Jitter
@asset(
required_resource_keys={"data_api"},
retry_policy=RetryPolicy(
max_retries=3,
delay=1,
backoff=Backoff.LINEAR,
jitter=Jitter.FULL
)
)
def orders(context) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
return context.resources.data_api.get_orders()
- Job tags take precedence over global defaults
- Op or asset settings are generally additive, e.g. an asset retry policy within a run is additive with a global run retry policy.
Dagster makes it easy to retry code which enables pipelines to be robust to flakiness and failures, e.g. API calls that may occasionally fail.
Either entire runs can be retried, or specific op or asset steps within a run can be re-attempted.
The code deployment can set a retry policy that applies to all runs.
run_retries:
max_retries: 3
retry_policy: FROM_FAILURE
Individual jobs can specify their own run retry policy using tags. For example, for an asset job:
analytics_job = define_asset_job(
name = "refresh_analytics_model_job",
selection=AssetSelection.keys(["ANALYTICS", "daily_order_summary"]).upstream(),
tags = {"dagster/max_retries": 2}
)
A run that fails and then sets up a re-try:
The re-tried run:
Assets or ops can specify retry policies that will cause the asset or op to be re-attempted within the original run.
from dagster import RetryPolicy, Backoff, Jitter
@asset(
required_resource_keys={"data_api"},
retry_policy=RetryPolicy(
max_retries=3,
delay=1,
backoff=Backoff.LINEAR,
jitter=Jitter.FULL
)
)
def orders(context) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
return context.resources.data_api.get_orders()
An asset that is re-attempted within the same run:
By default, an asset reconciliation sensor will attempt one run when launching runs to reconcilE stale assets or attempting to meet asset freshness policies. If the run fails, the assets will either remain stale or begin violating their freshness policies.
The reconciliation sensor will respect any default retry policies supplied by deployment settings. You can also add run retry policies to the sensor using run_tags
:
reconciler = build_asset_reconciliation_sensor(
name = "reconciler",
asset_selection = AssetSelection.all(),
run_tags={"dagster/max_retries": 2}
)
Runs launched by the reconciliation sensor will also respect any retry policies specified by the assets.
The options described above are recommended because they do not require any custom coding and all re-attempts are managed by Dagster. However, it is entirely possible to write your own code custom code to handle retries.
Using Dagster, your code can handle exceptions and ask Dagster to re-attempt:
@op
def flakes():
try:
flakey_operation()
except Exception as e:
raise RetryRequested(max_retries=3) from e
Without using any Dagster concepts, your code can handle exceptions and manage its own re-attempts:
@asset
def flakey():
attempts = 0
while attempts < max_attempts:
try:
flakey_operation()
except Exception as e:
attempts = attempts + 1
from dagster import job, op
@op()
def my_op(context):
context.log.info('running')
@job(
tags = {
"ecs/cpu": "256",
"ecs/memory": "512",
}
)
def my_job():
my_op()
@job(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
},
},
)
def my_job():
my_op()
** How Run Queues impact Parallelism**
Log forwarding refers to sending Dagster Cloud structured logs or compute logs to other monitoring or storage services such as AWS CloudWatch or Datadog (structured logs) or S3 or Azure Blob Storage (compute logs).
Agent Settings
Code Location Settings
Log customization refers to customizing what logs are captured by Dagster.
Dagster makes it easy to send useful alerts to the right teams at the right times.
In Dagster Cloud, alert policies can be configured to send email or slack messages on job success or job failure. https://docs.dagster.io/dagster-cloud/account/setting-up-alerts#managing-alert-policies-in-dagster-cloud
Dagster Cloud alert policies can also be specified using tags. For example, in your code you might apply a tag to a job:
predict_job = define_asset_job("predict_job",
selection=AssetSelection.keys(["FORECASTING","predicted_orders"]),
tags = {"alert_team": "ml"}
)
Then when you set up an alert, you can make it specific to the job with that tag. This allows you to send job failures for certain tagged runs to specific teams.
A common goal for data teams is to avoid spurious alerts. Often pipeline errors are recoverable and teams only want to get alerts if an SLA is violated. In Dagster these SLAs are represented using freshness policies. Dagster makes it easy to send an alert only when these SLAs are violated. For example, to send an email alert when an asset is violating its SLA by more than 5 minutes:
@freshness_policy_sensor(asset_selection=AssetSelection.all())
def asset_delay_alert_sensor(context: FreshnessPolicySensorContext):
if context.minutes_late is None or context.previous_minutes_late is None:
return
if context.minutes_late >= 5 and context.previous_minutes_late < 5:
with build_resources({
"email": my_smtp_resource
}) as resources:
resources.email.send_email_alert(context)
return
Note: This example assumes a hypothetical email resource with a function
send_email_alert
. Here is an example using SES.
Documentation: https://docs.dagster.io/concepts/partitions-schedules-sensors/asset-sensors#freshness-policy-sensors-
If your team is not using assets and freshness policies, it is also possible to create arbitrary alerts to track events such as a job starting or failing using sensors. For examples: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#run-status-sensors
Dagster Cloud will automatically send alerts if there are problems with the infrastructure such as an unhealthy dagster agent. Luckily, Dagster Cloud is able to recover automatically from infrastructure hiccups such as a temporarily down agent.
In addition to settings that enable Dagster behavior you can execute arbitrary code to make your pipeline more robust. For example, you might write an asset that includes retries, alerts, and loging using your own code:
@asset
def reinventing_wheels():
attempt=0
while attempt < 10:
try:
data = flakey_thing()
except Exception as e:
notify_slack(e, attempt)
attempt = attempt + 1
if data is None:
write_fail_log_to_somewhere()
raise Exception("asset failed")
return data