Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created October 9, 2018 19:43
Show Gist options
  • Save craigderington/9538f8f7c047e6da44965a8309d5fa28 to your computer and use it in GitHub Desktop.
Save craigderington/9538f8f7c047e6da44965a8309d5fa28 to your computer and use it in GitHub Desktop.
Tasks module for Celery project
import datetime
import time
import pymongo
import config
import json
import csv
import GeoIP
import requests
from celery.signals import task_postrun
from celery.utils.log import get_task_logger
from app import celery, db
from app.models import User
from sqlalchemy import and_, between
from sqlalchemy import exc, func
from sqlalchemy import text
from io import StringIO
from datetime import timedelta
# set up our logger utility
logger = get_task_logger(__name__)
# Open the geo data file once and store it in cache memory
gi = GeoIP.open('/var/lib/geoip/GeoLiteCity.dat', GeoIP.GEOIP_INDEX_CACHE | GeoIP.GEOIP_CHECK_CACHE)
def convert_datetime_object(o):
if isinstance(o, datetime.datetime):
return o.__str__()
def convert_utc_to_local(utcdate_obj):
nowtimestamp = time.time()
offset = datetime.datetime.fromtimestamp(nowtimestamp) - datetime.datetime.utcfromtimestamp(nowtimestamp)
return utcdate_obj + offset
def get_location(ip_addr):
gi_lookup = gi.record_by_addr(ip_addr)
return gi_lookup
@celery.task
def log(message):
"""Print some log messages"""
logger.debug(message)
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment