Skip to content

Instantly share code, notes, and snippets.

@thekie
Last active December 3, 2020 10:26
Show Gist options
  • Save thekie/3156ed426b14ad2f53b07b8168b497cc to your computer and use it in GitHub Desktop.
Save thekie/3156ed426b14ad2f53b07b8168b497cc to your computer and use it in GitHub Desktop.
Celery only gives instructions on how to write integration tests using pyTest not the god ol' python UnitTest framework. So I wrote a CeleryTestCase which can be used as a base class for your integration tests using a real celery worker in the background.
from unittest import TestCase
from celery.contrib.testing import worker
class CeleryTestCase(TestCase):
@property
def celery_app(self):
"""
Override this method to provide this testcase with the celery app under test
"""
raise NotImplementedError()
def run(self, result=None):
from kombu.asynchronous.hub import set_event_loop
set_event_loop(None)
self.celery_app.loader.import_module('celery.contrib.testing.tasks')
with worker.start_worker(self.celery_app, loglevel="debug"):
super(CeleryTestCase, self).run(result)
from celery import Celery
celery_app = Celery("CeleryUnderTest", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@celery_app.task
def mul(x, y):
return x * y
from .CodeUnderTest import celery_app, mul
from .CeleryTestCase import CeleryTestCase
class MyTestCase(CeleryTestCase):
@property
def celery_app(self):
return celery_app
def test_mul(self):
self.assertEqual(mul.delay(4, 4).get(timeout=10), 16)
@phacic
Copy link

phacic commented Dec 3, 2020

If you getting django.db.utils.InterfaceError: connection already closed change class CeleryTestCase(TestCase): to class CeleryTestCase(TransactionTestCase):

Worked for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment