Last active
October 21, 2021 14:07
-
-
Save paulonteri/3f4aa9981f56e509c0cf2cd11ff77a74 to your computer and use it in GitHub Desktop.
Google Cloud Tasks in Django
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import logging | |
import pickle | |
import sys | |
import timeit | |
from time import sleep | |
from django.conf import settings | |
from django.core.serializers.json import DjangoJSONEncoder | |
from django.http import HttpResponse | |
from django.utils.decorators import method_decorator | |
from django.views.decorators.csrf import csrf_exempt | |
from django.views.generic import View | |
from google.cloud import tasks_v2 | |
logger = logging.getLogger(__name__) | |
class CronUtils(View): | |
@method_decorator(csrf_exempt) | |
def dispatch(self, request, *args, **kwargs): | |
# Check if this task has cache lock checks | |
print("received task request") | |
print(request) | |
cache_lock_id = request.GET.get('cache_lock_id', None) | |
cache_lock_timeout = int(request.GET.get('cache_lock_timeout', 0)) | |
if cache_lock_id: | |
from django.core.cache import cache | |
if cache.add(cache_lock_id, "true", cache_lock_timeout): | |
try: | |
self.run_task(request) | |
finally: | |
cache.delete(cache_lock_id) | |
else: | |
self.run_task(request) | |
print('task_ok') | |
return HttpResponse('ok') | |
@staticmethod | |
def import_method_from_name(function_string): | |
import importlib | |
mod_name, func_name = function_string.rsplit('.', 1) | |
mod = importlib.import_module(mod_name) | |
return getattr(mod, func_name) | |
@staticmethod | |
def run_dotted_task(dotted_task, comma_separated_args, task_kwargs): | |
print("run_dotted_task") | |
task_handler_class = CronUtils.import_method_from_name(dotted_task) | |
if comma_separated_args: | |
args_list = [int(i) if i.isdigit() else str(i) | |
for i in comma_separated_args.split(',')] | |
result = task_handler_class(*args_list, **task_kwargs) | |
else: | |
result = task_handler_class(**task_kwargs) | |
return result | |
def run_task(self, request): | |
print("run_task") | |
# # # Task data | |
task = request.GET.get('task', None) | |
task_args = request.GET.get('args', None) | |
# request body will contain the task_kwargs | |
task_kwargs = {} | |
if request.body: | |
try: | |
# Load via JSON first | |
task_kwargs = json.loads(request.body.decode('utf-8')) | |
except ValueError: | |
# Then try pickle | |
task_kwargs = pickle.loads(request.body) | |
logger.debug("Received task : {} args : {} kwargs : {}".format(task, | |
task_args, task_kwargs)) | |
# Retry count | |
if request.GET.get('include_task_retry_count', None): | |
task_kwargs['task_retry_count'] = request.META.get( | |
'HTTP_X_APPENGINE_TASKEXECUTIONCOUNT', 0) | |
if task is not None: | |
# Gae cron doesn't allow us to run with less than 1 minute intervals. | |
# So we cheat by adding a task_repeat parameter to simulate running multiple times each interval | |
task_repeat = request.GET.get('task_repeat', None) | |
if task_repeat: | |
task_repeat = int(task_repeat) if task_repeat.isdigit() else 1 | |
if task_repeat and task_repeat > 1: | |
# How long should each loop take for equal spacing? | |
loop_max_seconds = 60 / task_repeat | |
for x in range(0, task_repeat): | |
# Loop start time | |
loop_start_time = timeit.default_timer() | |
# Execute the task | |
self.run_dotted_task(task, task_args, task_kwargs) | |
# How long did the loop take? | |
loop_elapsed_seconds = timeit.default_timer() - loop_start_time | |
# Still got time left over? Sleep for the rest of the loop | |
if loop_elapsed_seconds < loop_max_seconds: | |
sleep(loop_max_seconds - loop_elapsed_seconds) | |
else: | |
# Sleep for 0.1 seconds | |
sleep(0.1) | |
else: | |
# Execute the task | |
self.run_dotted_task(task, task_args, task_kwargs) | |
# TODO: Separate into separate util | |
@staticmethod | |
def get_base_url(): | |
from django.urls import reverse | |
return f"{settings.BASE_URL}{reverse('cron_utils')}" | |
@staticmethod | |
def create_cloud_task(task, task_args=[], task_kwargs={}, | |
project=settings.GOOGLE_CLOUD_PROJECT_ID, | |
queue=settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE, | |
location=settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE_LOCATION, | |
in_seconds=None, http_method=tasks_v2.HttpMethod.GET, retry=False, | |
cache_lock_id=None, cache_lock_timeout=900, | |
include_task_retry_count=False, task_name=None, | |
target_domain=None): | |
# Construct the relative uri with the dotted path to the task | |
dotted_task = task.__module__ + "." + task.__name__ | |
url = '{}?task={}'.format( | |
CronUtils.get_base_url(), dotted_task) | |
# Only create this task if there is no cache lock (prevents creation of many useless http requests) | |
if cache_lock_id is not None: | |
from django.core.cache import cache | |
if cache.get(cache_lock_id): | |
return -1 # Return -1 for testing | |
else: | |
# Add the cache_lock_id to the url. The cache lock will be set when the task is run | |
url += "&cache_lock_id={}&cache_lock_timeout={}".format( | |
cache_lock_id, cache_lock_timeout) | |
# # For simple positional args, we can use task args to send them as get variables. | |
if task_args: | |
if not isinstance(task_args, list): | |
task_args_list = list() | |
task_args_list.append(task_args) | |
task_args = task_args_list | |
comma_separated_args = ','.join(str(arg) for arg in task_args) | |
url += "&args={}".format(comma_separated_args) | |
if include_task_retry_count: | |
url += "&include_task_retry_count=1" | |
# Construct the request body. | |
task = { | |
'http_request': { # Specify the type of request. | |
'http_method': http_method, | |
'url': url | |
} | |
} | |
if task_name is not None: | |
# Add the name to tasks. | |
task["name"] = task_name | |
# For complex args, we use task_kwargs, which is passed as a json encoded string into the request body. | |
if task_kwargs is not None and len(task_kwargs): | |
# Http method must be POST | |
if http_method == tasks_v2.HttpMethod.GET: | |
# Switch to POST | |
task['http_request']['http_method'] = tasks_v2.HttpMethod.POST | |
logging.info( | |
'Cannot use "GET" http_method with task_kwargs. Switching to POST.') | |
# Convert payload into a json string | |
try: | |
task_kwargs = json.dumps(task_kwargs, cls=DjangoJSONEncoder) | |
# The API expects a payload of type bytes. | |
converted_kwargs = task_kwargs.encode() | |
except TypeError: | |
converted_kwargs = pickle.dumps(task_kwargs) | |
# Add the payload to the request | |
task['http_request']['body'] = converted_kwargs | |
in_development = 'test' in sys.argv[0] or sys.argv[1] in [ | |
'runserver', 'runserver_plus'] or 'test' in sys.argv[1] | |
# run task immediately if in development | |
if not in_development: | |
print("creating task") | |
if in_seconds is not None: | |
# Convert "seconds from now" into an rfc3339 datetime string. | |
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds) | |
# Create Timestamp protobuf. | |
from google.protobuf import timestamp_pb2 | |
timestamp = timestamp_pb2.Timestamp() | |
timestamp.FromDatetime(d) | |
# Add the timestamp to the tasks. | |
task['schedule_time'] = timestamp # noqa | |
# We change queues if the retry parameter is set and | |
# no new queue has been set (since the default queue isn't a retry queue) | |
# if retry and queue == settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE: | |
# queue = 'simple-retry-queue' | |
# Create a client. | |
# If you’re running in a Google Virtual Machine Environment (Compute Engine, App Engine, Cloud Run, Cloud Functions), authentication should “just work”. | |
# https://googleapis.dev/python/google-api-core/latest/auth.html#overview | |
client = tasks_v2.CloudTasksClient() | |
print(task, project, location, queue) | |
# Construct the fully qualified queue name. | |
parent = client.queue_path(project, location, queue) | |
# Use the client to build and send the task. | |
response = client.create_task(parent=parent, task=task) | |
logging.info('Created task {}'.format(response.name)) | |
return response | |
else: | |
# Actually run the task | |
print("running task in development") | |
from django.test import RequestFactory | |
factory = RequestFactory() | |
request = None | |
if task['http_request']['http_method'] == tasks_v2.HttpMethod.GET: | |
request = factory.get(url) | |
elif task['http_request']['http_method'] == tasks_v2.HttpMethod.POST: | |
request = factory.post(url, | |
data=(task['http_request']['body']), | |
content_type='application/octet-stream') | |
return CronUtils.as_view()(request) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from shulesuite.apps.shared.utils.cron_utils import CronUtils | |
urlpatterns = [ | |
path('cron/', CronUtils.as_view(), name='cron_utils'), | |
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .models import ContactUs, TempEmail, RequestDemo | |
# example async task | |
def send_contact_us_confirmation_mail_task(id: int): | |
contact_us = ContactUs.objects.get(pk=id) | |
contact_us.send_confirmation_email() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .tasks import send_contact_us_confirmation_mail_task | |
class ContactUsApi(generics.CreateAPIView): | |
queryset = ContactUs.objects.all() | |
serializer_class = ContactUsSerializer | |
permission_classes = [AllowAny] | |
def post(self, request, *args, **kwargs): | |
serializer = self.get_serializer(data=request.data) | |
serializer.is_valid(raise_exception=True) | |
serializer.save() | |
# send confirmation mail | |
CronUtils.create_cloud_task( | |
task=send_contact_us_confirmation_mail_task, | |
task_kwargs={"id": serializer.data['id']} | |
) | |
return Response(serializer.data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment