Skip to content

Instantly share code, notes, and snippets.

@slopp
Created September 23, 2024 19:31
Show Gist options
  • Save slopp/69d3880d822b0fe312a59c5d650e824b to your computer and use it in GitHub Desktop.
Save slopp/69d3880d822b0fe312a59c5d650e824b to your computer and use it in GitHub Desktop.
Canary Ping Dagster
import os
from dagster import define_asset_job, load_assets_from_package_module, repository, with_resources, op, job, ScheduleDefinition
from my_dagster_project import assets
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.metrics_api import MetricsApi
from datadog_api_client.v2.model.metric_intake_type import MetricIntakeType
from datadog_api_client.v2.model.metric_payload import MetricPayload
from datadog_api_client.v2.model.metric_point import MetricPoint
from datetime import datetime
from datadog_api_client.v2.model.metric_series import MetricSeries
@op
def submit_datadog_metric(context):
body = MetricPayload(
series=[
MetricSeries(
metric="elementl-serverless-test:op-execution",
type=MetricIntakeType.COUNT,
points=[
MetricPoint(
timestamp=int(datetime.now().timestamp()),
value=1,
),
],
),
],
)
configuration = Configuration()
with ApiClient(configuration) as api_client:
api_instance = MetricsApi(api_client)
response = api_instance.submit_metrics(body=body)
context.log.info(response)
assert not response.errors
@job
def submit_datadog_metric_job():
submit_datadog_metric()
five_min_schedule = ScheduleDefinition(job=submit_datadog_metric_job, cron_schedule="*/5 * * * *")
@repository
def my_dagster_project():
return [
submit_datadog_metric_job,
five_min_schedule,
load_assets_from_package_module(assets),
define_asset_job(name="all_assets_job"),
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment