Skip to content

Instantly share code, notes, and snippets.

@paulonteri
Last active October 21, 2021 14:07
Show Gist options
  • Save paulonteri/3f4aa9981f56e509c0cf2cd11ff77a74 to your computer and use it in GitHub Desktop.
Save paulonteri/3f4aa9981f56e509c0cf2cd11ff77a74 to your computer and use it in GitHub Desktop.
Google Cloud Tasks in Django
import datetime
import json
import logging
import pickle
import sys
import timeit
from time import sleep
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
from google.cloud import tasks_v2
logger = logging.getLogger(__name__)
class CronUtils(View):
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
# Check if this task has cache lock checks
print("received task request")
print(request)
cache_lock_id = request.GET.get('cache_lock_id', None)
cache_lock_timeout = int(request.GET.get('cache_lock_timeout', 0))
if cache_lock_id:
from django.core.cache import cache
if cache.add(cache_lock_id, "true", cache_lock_timeout):
try:
self.run_task(request)
finally:
cache.delete(cache_lock_id)
else:
self.run_task(request)
print('task_ok')
return HttpResponse('ok')
@staticmethod
def import_method_from_name(function_string):
import importlib
mod_name, func_name = function_string.rsplit('.', 1)
mod = importlib.import_module(mod_name)
return getattr(mod, func_name)
@staticmethod
def run_dotted_task(dotted_task, comma_separated_args, task_kwargs):
print("run_dotted_task")
task_handler_class = CronUtils.import_method_from_name(dotted_task)
if comma_separated_args:
args_list = [int(i) if i.isdigit() else str(i)
for i in comma_separated_args.split(',')]
result = task_handler_class(*args_list, **task_kwargs)
else:
result = task_handler_class(**task_kwargs)
return result
def run_task(self, request):
print("run_task")
# # # Task data
task = request.GET.get('task', None)
task_args = request.GET.get('args', None)
# request body will contain the task_kwargs
task_kwargs = {}
if request.body:
try:
# Load via JSON first
task_kwargs = json.loads(request.body.decode('utf-8'))
except ValueError:
# Then try pickle
task_kwargs = pickle.loads(request.body)
logger.debug("Received task : {} args : {} kwargs : {}".format(task,
task_args, task_kwargs))
# Retry count
if request.GET.get('include_task_retry_count', None):
task_kwargs['task_retry_count'] = request.META.get(
'HTTP_X_APPENGINE_TASKEXECUTIONCOUNT', 0)
if task is not None:
# Gae cron doesn't allow us to run with less than 1 minute intervals.
# So we cheat by adding a task_repeat parameter to simulate running multiple times each interval
task_repeat = request.GET.get('task_repeat', None)
if task_repeat:
task_repeat = int(task_repeat) if task_repeat.isdigit() else 1
if task_repeat and task_repeat > 1:
# How long should each loop take for equal spacing?
loop_max_seconds = 60 / task_repeat
for x in range(0, task_repeat):
# Loop start time
loop_start_time = timeit.default_timer()
# Execute the task
self.run_dotted_task(task, task_args, task_kwargs)
# How long did the loop take?
loop_elapsed_seconds = timeit.default_timer() - loop_start_time
# Still got time left over? Sleep for the rest of the loop
if loop_elapsed_seconds < loop_max_seconds:
sleep(loop_max_seconds - loop_elapsed_seconds)
else:
# Sleep for 0.1 seconds
sleep(0.1)
else:
# Execute the task
self.run_dotted_task(task, task_args, task_kwargs)
# TODO: Separate into separate util
@staticmethod
def get_base_url():
from django.urls import reverse
return f"{settings.BASE_URL}{reverse('cron_utils')}"
@staticmethod
def create_cloud_task(task, task_args=[], task_kwargs={},
project=settings.GOOGLE_CLOUD_PROJECT_ID,
queue=settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE,
location=settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE_LOCATION,
in_seconds=None, http_method=tasks_v2.HttpMethod.GET, retry=False,
cache_lock_id=None, cache_lock_timeout=900,
include_task_retry_count=False, task_name=None,
target_domain=None):
# Construct the relative uri with the dotted path to the task
dotted_task = task.__module__ + "." + task.__name__
url = '{}?task={}'.format(
CronUtils.get_base_url(), dotted_task)
# Only create this task if there is no cache lock (prevents creation of many useless http requests)
if cache_lock_id is not None:
from django.core.cache import cache
if cache.get(cache_lock_id):
return -1 # Return -1 for testing
else:
# Add the cache_lock_id to the url. The cache lock will be set when the task is run
url += "&cache_lock_id={}&cache_lock_timeout={}".format(
cache_lock_id, cache_lock_timeout)
# # For simple positional args, we can use task args to send them as get variables.
if task_args:
if not isinstance(task_args, list):
task_args_list = list()
task_args_list.append(task_args)
task_args = task_args_list
comma_separated_args = ','.join(str(arg) for arg in task_args)
url += "&args={}".format(comma_separated_args)
if include_task_retry_count:
url += "&include_task_retry_count=1"
# Construct the request body.
task = {
'http_request': { # Specify the type of request.
'http_method': http_method,
'url': url
}
}
if task_name is not None:
# Add the name to tasks.
task["name"] = task_name
# For complex args, we use task_kwargs, which is passed as a json encoded string into the request body.
if task_kwargs is not None and len(task_kwargs):
# Http method must be POST
if http_method == tasks_v2.HttpMethod.GET:
# Switch to POST
task['http_request']['http_method'] = tasks_v2.HttpMethod.POST
logging.info(
'Cannot use "GET" http_method with task_kwargs. Switching to POST.')
# Convert payload into a json string
try:
task_kwargs = json.dumps(task_kwargs, cls=DjangoJSONEncoder)
# The API expects a payload of type bytes.
converted_kwargs = task_kwargs.encode()
except TypeError:
converted_kwargs = pickle.dumps(task_kwargs)
# Add the payload to the request
task['http_request']['body'] = converted_kwargs
in_development = 'test' in sys.argv[0] or sys.argv[1] in [
'runserver', 'runserver_plus'] or 'test' in sys.argv[1]
# run task immediately if in development
if not in_development:
print("creating task")
if in_seconds is not None:
# Convert "seconds from now" into an rfc3339 datetime string.
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)
# Create Timestamp protobuf.
from google.protobuf import timestamp_pb2
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp # noqa
# We change queues if the retry parameter is set and
# no new queue has been set (since the default queue isn't a retry queue)
# if retry and queue == settings.DEFAULT_GOOGLE_CLOUD_TASK_QUEUE:
# queue = 'simple-retry-queue'
# Create a client.
# If you’re running in a Google Virtual Machine Environment (Compute Engine, App Engine, Cloud Run, Cloud Functions), authentication should “just work”.
# https://googleapis.dev/python/google-api-core/latest/auth.html#overview
client = tasks_v2.CloudTasksClient()
print(task, project, location, queue)
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)
# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)
logging.info('Created task {}'.format(response.name))
return response
else:
# Actually run the task
print("running task in development")
from django.test import RequestFactory
factory = RequestFactory()
request = None
if task['http_request']['http_method'] == tasks_v2.HttpMethod.GET:
request = factory.get(url)
elif task['http_request']['http_method'] == tasks_v2.HttpMethod.POST:
request = factory.post(url,
data=(task['http_request']['body']),
content_type='application/octet-stream')
return CronUtils.as_view()(request)
from shulesuite.apps.shared.utils.cron_utils import CronUtils
urlpatterns = [
path('cron/', CronUtils.as_view(), name='cron_utils'),
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
from .models import ContactUs, TempEmail, RequestDemo
# example async task
def send_contact_us_confirmation_mail_task(id: int):
contact_us = ContactUs.objects.get(pk=id)
contact_us.send_confirmation_email()
from .tasks import send_contact_us_confirmation_mail_task
class ContactUsApi(generics.CreateAPIView):
queryset = ContactUs.objects.all()
serializer_class = ContactUsSerializer
permission_classes = [AllowAny]
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
# send confirmation mail
CronUtils.create_cloud_task(
task=send_contact_us_confirmation_mail_task,
task_kwargs={"id": serializer.data['id']}
)
return Response(serializer.data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment