Last active
December 31, 2019 12:23
-
-
Save krsmedlund/522d467903648039b73974007e2fb7fa to your computer and use it in GitHub Desktop.
Quick and dirty django async background tasks to run after request is done.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ uvicorn test.asgi:application | |
INFO Started server process | |
INFO Uvicorn running | |
INFO Application startup complete. | |
INFO 127.0.0.1:54844 - "GET / HTTP/1.1" 200 | |
Request done at 2019-12-11 19:13:01.500296 | |
Sleepy woke at 2019-12-11 19:13:06.506096 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import django | |
from django.core.handlers.asgi import ASGIHandler | |
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test.settings') | |
class ASGIHandler_(ASGIHandler): | |
async def send_response(self, response, *args, **kwargs): | |
await super().send_response(response, *args, **kwargs) | |
if hasattr(response, "background_task"): | |
await response.background_task() | |
django.setup(set_prefix=False) | |
application = ASGIHandler_() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from django.contrib import admin | |
from django.urls import path | |
from django.http import HttpResponse | |
class BackgroundTaskResponse(HttpResponse): | |
def __init__(self, *, task, **kwargs): | |
self.background_task = task | |
return super().__init__(**kwargs) | |
def close(self, *args, **kwargs): | |
print(f"Request done at {datetime.now()}") | |
return super().close(*args, **kwargs) | |
async def slow(): | |
await asyncio.sleep(5) | |
print(f"Sleepy woke at {datetime.now()}") | |
def current_datetime(request): | |
return BackgroundTaskResponse( | |
content=f"<html><body>It is now {datetime.now()}.</body></html>", | |
task=slow | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Are the
admin
&path
imports inviews.py
necessary?