Last active
March 22, 2023 17:40
-
-
Save emmettbutler/49c1e8d26853b00c6aeb5c87e3aadc2b to your computer and use it in GitHub Desktop.
A minimal example of how to use dd-trace-py v1.9.3 to trace celery.beat and redbeat scheduling functionality
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import celery | |
from ddtrace import Pin | |
from ddtrace import config | |
# this should be omitted if running celery under ddtrace-run | |
import ddtrace.bootstrap.sitecustomize # noqa | |
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY | |
from ddtrace.constants import SPAN_KIND | |
from ddtrace.constants import SPAN_MEASURED_KEY | |
from ddtrace.contrib import trace_utils | |
from ddtrace.ext import SpanKind | |
from ddtrace.ext import SpanTypes | |
from ddtrace.vendor import wrapt | |
try: | |
import redbeat | |
except ImportError: | |
redbeat = None | |
log = logging.getLogger() | |
REDIS_URL = "redis://127.0.0.1:{port}".format(port=6379) | |
BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) | |
BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) | |
app = celery.Celery(broker=BROKER_URL, backend=BACKEND_URL) | |
app.conf.beat_schedule = {"do-it": {"task": "my_celery_test.mytask", "schedule": 5, "args": (time.time(),)}} | |
# custom instrumentation for celery.beat and redbeat | |
app.conf.broker_connection_max_retries = 3 | |
def _traced_beat_function(integration_config, fn_name, scheduler_module_name="celery.beat", resource_fn=None): | |
def _traced_beat_inner(func, instance, args, kwargs): | |
pin = Pin.get_from(instance) | |
if not pin or not pin.enabled(): | |
return func(*args, **kwargs) | |
with pin.tracer.trace( | |
"{}.{}".format(scheduler_module_name, fn_name), | |
span_type=SpanTypes.WORKER, | |
service=trace_utils.ext_service(pin, integration_config), | |
) as span: | |
if resource_fn: | |
span.resource = resource_fn(args) | |
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) | |
rate = config.celery.get_analytics_sample_rate() | |
if rate is not None: | |
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate) | |
span.set_tag(SPAN_MEASURED_KEY) | |
return func(*args, **kwargs) | |
return _traced_beat_inner | |
wrapt.wrap_function_wrapper( | |
"celery.beat", | |
"Scheduler.apply_entry", | |
_traced_beat_function(config.celery, "apply_entry", resource_fn=lambda args: args[0].name), | |
) | |
wrapt.wrap_function_wrapper( | |
"celery.beat", | |
"Scheduler.tick", | |
_traced_beat_function( | |
config.celery, | |
"tick", | |
), | |
) | |
Pin().onto(celery.beat.Scheduler) | |
if redbeat: | |
scheduler_module_name = "redbeat.schedulers" | |
wrapt.wrap_function_wrapper( | |
"redbeat.schedulers", | |
"RedBeatScheduler.maybe_due", | |
_traced_beat_function( | |
config.celery, | |
"maybe_due", | |
scheduler_module_name=scheduler_module_name, | |
resource_fn=lambda args: args[0].name, | |
), | |
) | |
wrapt.wrap_function_wrapper( | |
"redbeat.schedulers", | |
"RedBeatScheduler.tick", | |
_traced_beat_function( | |
config.celery, | |
"tick", | |
scheduler_module_name=scheduler_module_name, | |
), | |
) | |
Pin().onto(redbeat.schedulers.RedBeatScheduler) | |
@app.task | |
def mytask(n): | |
return f"foobar {n}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment