Skip to content

Instantly share code, notes, and snippets.

@surenkov
Last active July 29, 2022 08:16
Show Gist options
  • Save surenkov/82994906aaf5204c554bdaca6182657b to your computer and use it in GitHub Desktop.
Save surenkov/82994906aaf5204c554bdaca6182657b to your computer and use it in GitHub Desktop.
Celery tasks debouncing
import functools
import logging
import pickle
import typing as t
from contextlib import contextmanager
from random import random
from django.conf import settings
from celery import Task
from celery.canvas import Signature, signature
from celery.result import AsyncResult
from .redis_broker_db import broker_db
__all__ = "debounce_task", "debounced", "reconstruct_signature"
THROTTLING_DISABLED = not settings.ENABLE_CELERY_TASK_THROTTLING
Args = t.Tuple[t.Any, ...]
Kwargs = t.Dict[str, t.Any]
DistinctPositionalArgs = int
DistinctKwargs = str
DistinctArgs = t.Union[DistinctPositionalArgs, DistinctKwargs]
CustomDistinctArgs = t.Callable[[Signature], str]
DistinctArgsBy = t.Union[DistinctArgs, t.Sequence[DistinctArgs], CustomDistinctArgs]
PredefArgsBehavior = t.Literal["prepend", "replace", "unique"]
CustomArgsBehavior = t.Callable[[Signature, Args, Kwargs], Signature]
ArgsBehavior = t.Union[PredefArgsBehavior, CustomArgsBehavior]
log = logging.getLogger(__name__)
def debounce_task(
signature: Signature,
timeout: float = 0.0,
jitter: float = 0.0,
maxargs: int = None,
behavior: ArgsBehavior = "prepend",
distinct_by: DistinctArgsBy = None,
) -> Signature:
""" Missing Celery task debouncing. Uses Redis broker storage as a grouper for calls.
:param signature: Task signature instance. Direct task invocation is not supported.
:param timeout: Time frame while celery awaits other debounced calls for the task.
:param jitter: Ranadomization coefficient for time frame, preferrably in [0..1] range.
:param maxargs: If the number of merged arguments, both positional and keyword,
exceeds ``maxargs``, the task will be scheduled for immediate call.
:param behavior: Arguments merging behavior.
:param dinstinct_by: Specify how throttling behavior should store
different signatures of the same task.
Could be either
int (positional arg)
str (keyword arg)
sequence of [int, str]
function of (Signature) -> str
For example ``debounce_task(add.s(1, foo="bar"), distinct_by=["foo"])`` and
``debounce_task(add.s(1, foo="baz"), distinct_by=["foo"])``
will result in two different debounce calls.
:return: New signature of subsequent delayed task call.
:rtype: Signature
:raises ValueError: in case of unknown behavior.
:raises AssertionError: in case of wrong numeric options.
Example:
>>> debounce_task(add.s(1), timeout=1).delay()
>>> ...
>>> debounce_task(add.s(2), timeout=1).delay()
Similar to a call of:
>>> add.s(1, 2).apply_async(countdown=1)
"""
from myapp.tasks import call_debounced_task
if THROTTLING_DISABLED:
return signature.set(immutable=True)
assert timeout >= 0.0, "Timeout seconds should be non-negative"
assert jitter >= 0.0, "Jitter coefficient should be non-negative"
if jitter > 0.0:
timeout *= 1 + (jitter * random())
if behavior == "prepend":
reduce_args = type(signature).clone
elif behavior == "replace":
reduce_args = _reduce_args_replace
elif behavior == "unique":
reduce_args = _reduce_args_unique_merge
elif callable(behavior):
reduce_args = behavior
else:
raise ValueError(f"Unknown behavior: {behavior}")
signature_key = get_signature_key(signature, distinct_by)
with task_store(signature_key) as store:
_, args, kwargs = pop_task_args(store)
try:
signature = reduce_args(signature, args, kwargs)
except Exception as e:
log.exception("Error merging arguments, falling back", exc_info=e)
args, kwargs = signature.args, signature.kwargs
if timeout == 0.0 or (maxargs and len(args) + len(kwargs) >= maxargs):
return signature.set(immutable=True)
push_task_args(store, args, kwargs)
debounce_sig = call_debounced_task.s(signature.task, signature_key)
return debounce_sig.set(countdown=timeout, immutable=True)
class _DebouncedCallable(t.Protocol):
original_task: Task
def __call__(self, *args, **kwargs) -> Signature: ...
def delay(self, *args, **kwargs) -> AsyncResult: ...
def apply(self, args, kwargs, **options) -> AsyncResult: ...
def apply_async(self, args, kwargs, **options) -> AsyncResult: ...
def debounced(**opts):
""" Decorator around ``debounce_task`` helper function.
Accepts ``debounce_task`` arguments in keyword form, except of ``signature`` param.
IMPORTANT NOTE: ``debounce`` transforms the task into ``_DebouncedCallable``,
which returns actual task signature, and also tries to mimic
some of the task's calling interfaces.
"""
opts.pop("signature", None)
def debounce_wrapper(task: t.Callable) -> _DebouncedCallable:
def inner_call(*args, **kwargs) -> Signature:
signature = task.s(*args, **kwargs)
return debounce_task(signature, **opts)
def inner_delay(*args, **kwargs):
signature = task.s(*args, **kwargs)
return debounce_task(signature, **opts).delay()
def inner_apply(args, kwargs, **attrs):
signature = task.signature(args, kwargs, **attrs)
return debounce_task(signature, **opts).apply()
def inner_apply_async(args, kwargs, **attrs):
signature = task.signature(args, kwargs, **attrs)
return debounce_task(signature, **opts).apply_async()
inner_call.original_task = task
inner_call.delay = inner_delay
inner_call.apply = inner_apply
inner_call.apply_async = inner_apply_async
return functools.update_wrapper(inner_call, task)
return debounce_wrapper
def reconstruct_signature(task_name: str, signature_key: str) -> t.Optional[Signature]:
with task_store(signature_key) as store:
exists, args, kwargs = pop_task_args(store)
if exists:
return signature(task_name, args, kwargs, immutable=True)
return None
def get_signature_key(signature: Signature, distinct_by: DistinctArgsBy = None) -> str:
unique_part = _extract_signature_key_parts(signature, distinct_by)
unique_part = ":".join(unique_part)
key_parts = filter(None, (signature.task, unique_part))
return "~".join(key_parts)
@contextmanager
def task_store(signature_key: str):
signature_key = f"myapp:debounce-task:{signature_key}"
with broker_db.atomic():
yield broker_db.List(signature_key)
def pop_task_args(task_store) -> t.Tuple[bool, Args, Kwargs]:
try:
exists = True
args, kwargs = pickle.loads(task_store.popleft())
except TypeError:
exists, args, kwargs = False, (), {}
return exists, args, kwargs
def push_task_args(task_store, args, kwargs):
data = pickle.dumps((args, kwargs))
task_store.append(data)
def _reduce_args_unique_merge(sig: Signature, args: Args, kwargs: Kwargs):
unique_args = set(sig.args or ()).union(args)
merged_kwargs = dict(sig.kwargs or {}, **kwargs)
return sig.replace(tuple(unique_args), merged_kwargs)
def _reduce_args_replace(signature: Signature, _args: Args, _kwargs: Kwargs):
return signature
def _extract_signature_key_parts(signature, distinct_by) -> t.Iterable[str]:
if isinstance(distinct_by, str):
yield str(signature.kwargs.get(distinct_by, ""))
elif isinstance(distinct_by, int):
try:
part = signature.args[distinct_by]
except IndexError:
part = ""
yield str(part)
elif isinstance(distinct_by, t.Sequence):
for key in distinct_by:
yield from _extract_signature_key_parts(signature, key)
elif callable(distinct_by):
yield str(distinct_by(signature))
from django.conf import settings
from walrus import Database
broker_db: Database = Database.from_url(settings.CELERY_BROKER_URL)
from myapp.celery import app
from misc.utils.celery import reconstruct_signature
@app.task(serializer="pickle")
def call_debounced_task(task_name: str, signature_key: str):
if (signature := reconstruct_signature(task_name, signature_key)) is not None:
return signature.delay()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment