Last active
March 28, 2022 23:14
-
-
Save jonashaag/3b56e831839243e5391ff6ddec85dbdf to your computer and use it in GitHub Desktop.
Celery Django model serializer – pass Django model instances as arguments to Celery tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Our convenience Celery Task wrapper that allows us to conveniently pass | |
model instances as Task arguments. | |
It "serializes" model instances to IDs and "deserializes" these IDs to model | |
instances upon task execution. | |
Serialized representation of a model instance is (sentinel, app_name, model_name, pk). | |
""" | |
import celery | |
from django.apps import apps | |
from django.db import models | |
from kombu.utils.objects import cached_property | |
MODEL_ARGUMENT_MARKER = '__model_instance__' | |
def wrap_model_argument(arg): | |
if isinstance(arg, models.Model): | |
return (MODEL_ARGUMENT_MARKER, arg._meta.app_label, | |
arg._meta.model_name, arg.pk) | |
else: | |
return arg | |
def unwrap_model_argument(arg): | |
if isinstance( | |
arg, | |
(list, tuple)) and len(arg) == 4 and arg[0] == MODEL_ARGUMENT_MARKER: | |
return apps.get_model(arg[1], arg[2]).objects.get(pk=arg[3]) | |
else: | |
return arg | |
class ModelArgumentTask(celery.Task): | |
def __call__(self, *args, **kwargs): | |
return wrap_model_argument(super().__call__( | |
*[unwrap_model_argument(arg) for arg in args], | |
**{k: unwrap_model_argument(v) | |
for k, v in kwargs.items()})) | |
def apply_async(self, args=None, kwargs=None, *apply_args, **apply_kwargs): | |
return super().apply_async( | |
[wrap_model_argument(arg) for arg in args or ()], | |
{k: wrap_model_argument(v) | |
for k, v in (kwargs or {}).items()}, *apply_args, **apply_kwargs) | |
def signature(self, args, kwargs, *starargs, **starkwargs): | |
return super().signature( | |
[wrap_model_argument(arg) for arg in args or ()], | |
{k: wrap_model_argument(v) | |
for k, v in (kwargs or {}).items()}, *starargs, **starkwargs) | |
def AsyncResult(self, task_id, **kwargs): | |
return self.AsyncResult_wrapper(task_id, backend=self.backend, task_name=self.name, **kwargs) | |
@cached_property | |
def AsyncResult_wrapper(self): | |
class ModelArgumentAsyncResult(self._get_app().AsyncResult): | |
def get(self, *args, **kwargs): | |
return unwrap_model_argument(super().get(*args, **kwargs)) | |
return ModelArgumentAsyncResult | |
def task(*args, **kwargs): | |
if len(args) == 1: | |
return celery.task(base=ModelArgumentTask)(args[0]) | |
else: | |
assert 'base' not in kwargs | |
kwargs['base'] = ModelArgumentTask | |
return celery.task(*args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This looks great but I couldn't make it work, is it incompatible with queuing jobs via
send_task
?Seems like celery barfs trying to route the task before it ever runs
apply_async
.