Last active
July 29, 2022 08:16
-
-
Save surenkov/82994906aaf5204c554bdaca6182657b to your computer and use it in GitHub Desktop.
Celery tasks debouncing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import logging | |
import pickle | |
import typing as t | |
from contextlib import contextmanager | |
from random import random | |
from django.conf import settings | |
from celery import Task | |
from celery.canvas import Signature, signature | |
from celery.result import AsyncResult | |
from .redis_broker_db import broker_db | |
__all__ = "debounce_task", "debounced", "reconstruct_signature" | |
THROTTLING_DISABLED = not settings.ENABLE_CELERY_TASK_THROTTLING | |
Args = t.Tuple[t.Any, ...] | |
Kwargs = t.Dict[str, t.Any] | |
DistinctPositionalArgs = int | |
DistinctKwargs = str | |
DistinctArgs = t.Union[DistinctPositionalArgs, DistinctKwargs] | |
CustomDistinctArgs = t.Callable[[Signature], str] | |
DistinctArgsBy = t.Union[DistinctArgs, t.Sequence[DistinctArgs], CustomDistinctArgs] | |
PredefArgsBehavior = t.Literal["prepend", "replace", "unique"] | |
CustomArgsBehavior = t.Callable[[Signature, Args, Kwargs], Signature] | |
ArgsBehavior = t.Union[PredefArgsBehavior, CustomArgsBehavior] | |
log = logging.getLogger(__name__) | |
def debounce_task( | |
signature: Signature, | |
timeout: float = 0.0, | |
jitter: float = 0.0, | |
maxargs: int = None, | |
behavior: ArgsBehavior = "prepend", | |
distinct_by: DistinctArgsBy = None, | |
) -> Signature: | |
""" Missing Celery task debouncing. Uses Redis broker storage as a grouper for calls. | |
:param signature: Task signature instance. Direct task invocation is not supported. | |
:param timeout: Time frame while celery awaits other debounced calls for the task. | |
:param jitter: Ranadomization coefficient for time frame, preferrably in [0..1] range. | |
:param maxargs: If the number of merged arguments, both positional and keyword, | |
exceeds ``maxargs``, the task will be scheduled for immediate call. | |
:param behavior: Arguments merging behavior. | |
:param dinstinct_by: Specify how throttling behavior should store | |
different signatures of the same task. | |
Could be either | |
int (positional arg) | |
str (keyword arg) | |
sequence of [int, str] | |
function of (Signature) -> str | |
For example ``debounce_task(add.s(1, foo="bar"), distinct_by=["foo"])`` and | |
``debounce_task(add.s(1, foo="baz"), distinct_by=["foo"])`` | |
will result in two different debounce calls. | |
:return: New signature of subsequent delayed task call. | |
:rtype: Signature | |
:raises ValueError: in case of unknown behavior. | |
:raises AssertionError: in case of wrong numeric options. | |
Example: | |
>>> debounce_task(add.s(1), timeout=1).delay() | |
>>> ... | |
>>> debounce_task(add.s(2), timeout=1).delay() | |
Similar to a call of: | |
>>> add.s(1, 2).apply_async(countdown=1) | |
""" | |
from myapp.tasks import call_debounced_task | |
if THROTTLING_DISABLED: | |
return signature.set(immutable=True) | |
assert timeout >= 0.0, "Timeout seconds should be non-negative" | |
assert jitter >= 0.0, "Jitter coefficient should be non-negative" | |
if jitter > 0.0: | |
timeout *= 1 + (jitter * random()) | |
if behavior == "prepend": | |
reduce_args = type(signature).clone | |
elif behavior == "replace": | |
reduce_args = _reduce_args_replace | |
elif behavior == "unique": | |
reduce_args = _reduce_args_unique_merge | |
elif callable(behavior): | |
reduce_args = behavior | |
else: | |
raise ValueError(f"Unknown behavior: {behavior}") | |
signature_key = get_signature_key(signature, distinct_by) | |
with task_store(signature_key) as store: | |
_, args, kwargs = pop_task_args(store) | |
try: | |
signature = reduce_args(signature, args, kwargs) | |
except Exception as e: | |
log.exception("Error merging arguments, falling back", exc_info=e) | |
args, kwargs = signature.args, signature.kwargs | |
if timeout == 0.0 or (maxargs and len(args) + len(kwargs) >= maxargs): | |
return signature.set(immutable=True) | |
push_task_args(store, args, kwargs) | |
debounce_sig = call_debounced_task.s(signature.task, signature_key) | |
return debounce_sig.set(countdown=timeout, immutable=True) | |
class _DebouncedCallable(t.Protocol): | |
original_task: Task | |
def __call__(self, *args, **kwargs) -> Signature: ... | |
def delay(self, *args, **kwargs) -> AsyncResult: ... | |
def apply(self, args, kwargs, **options) -> AsyncResult: ... | |
def apply_async(self, args, kwargs, **options) -> AsyncResult: ... | |
def debounced(**opts): | |
""" Decorator around ``debounce_task`` helper function. | |
Accepts ``debounce_task`` arguments in keyword form, except of ``signature`` param. | |
IMPORTANT NOTE: ``debounce`` transforms the task into ``_DebouncedCallable``, | |
which returns actual task signature, and also tries to mimic | |
some of the task's calling interfaces. | |
""" | |
opts.pop("signature", None) | |
def debounce_wrapper(task: t.Callable) -> _DebouncedCallable: | |
def inner_call(*args, **kwargs) -> Signature: | |
signature = task.s(*args, **kwargs) | |
return debounce_task(signature, **opts) | |
def inner_delay(*args, **kwargs): | |
signature = task.s(*args, **kwargs) | |
return debounce_task(signature, **opts).delay() | |
def inner_apply(args, kwargs, **attrs): | |
signature = task.signature(args, kwargs, **attrs) | |
return debounce_task(signature, **opts).apply() | |
def inner_apply_async(args, kwargs, **attrs): | |
signature = task.signature(args, kwargs, **attrs) | |
return debounce_task(signature, **opts).apply_async() | |
inner_call.original_task = task | |
inner_call.delay = inner_delay | |
inner_call.apply = inner_apply | |
inner_call.apply_async = inner_apply_async | |
return functools.update_wrapper(inner_call, task) | |
return debounce_wrapper | |
def reconstruct_signature(task_name: str, signature_key: str) -> t.Optional[Signature]: | |
with task_store(signature_key) as store: | |
exists, args, kwargs = pop_task_args(store) | |
if exists: | |
return signature(task_name, args, kwargs, immutable=True) | |
return None | |
def get_signature_key(signature: Signature, distinct_by: DistinctArgsBy = None) -> str: | |
unique_part = _extract_signature_key_parts(signature, distinct_by) | |
unique_part = ":".join(unique_part) | |
key_parts = filter(None, (signature.task, unique_part)) | |
return "~".join(key_parts) | |
@contextmanager | |
def task_store(signature_key: str): | |
signature_key = f"myapp:debounce-task:{signature_key}" | |
with broker_db.atomic(): | |
yield broker_db.List(signature_key) | |
def pop_task_args(task_store) -> t.Tuple[bool, Args, Kwargs]: | |
try: | |
exists = True | |
args, kwargs = pickle.loads(task_store.popleft()) | |
except TypeError: | |
exists, args, kwargs = False, (), {} | |
return exists, args, kwargs | |
def push_task_args(task_store, args, kwargs): | |
data = pickle.dumps((args, kwargs)) | |
task_store.append(data) | |
def _reduce_args_unique_merge(sig: Signature, args: Args, kwargs: Kwargs): | |
unique_args = set(sig.args or ()).union(args) | |
merged_kwargs = dict(sig.kwargs or {}, **kwargs) | |
return sig.replace(tuple(unique_args), merged_kwargs) | |
def _reduce_args_replace(signature: Signature, _args: Args, _kwargs: Kwargs): | |
return signature | |
def _extract_signature_key_parts(signature, distinct_by) -> t.Iterable[str]: | |
if isinstance(distinct_by, str): | |
yield str(signature.kwargs.get(distinct_by, "")) | |
elif isinstance(distinct_by, int): | |
try: | |
part = signature.args[distinct_by] | |
except IndexError: | |
part = "" | |
yield str(part) | |
elif isinstance(distinct_by, t.Sequence): | |
for key in distinct_by: | |
yield from _extract_signature_key_parts(signature, key) | |
elif callable(distinct_by): | |
yield str(distinct_by(signature)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.conf import settings | |
from walrus import Database | |
broker_db: Database = Database.from_url(settings.CELERY_BROKER_URL) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from myapp.celery import app | |
from misc.utils.celery import reconstruct_signature | |
@app.task(serializer="pickle") | |
def call_debounced_task(task_name: str, signature_key: str): | |
if (signature := reconstruct_signature(task_name, signature_key)) is not None: | |
return signature.delay() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment