Skip to content

Instantly share code, notes, and snippets.

@lolobosse
Created May 3, 2019 17:30
Show Gist options
  • Save lolobosse/8ceded877719015f8c0dab60bd6c5d37 to your computer and use it in GitHub Desktop.
Save lolobosse/8ceded877719015f8c0dab60bd6c5d37 to your computer and use it in GitHub Desktop.
mapper = {
'function1': function1,
'function2': function2
}
def execute_pending_tasks():
all_pending_tasks = Task.query.all()
for t in all_pending_tasks:
try:
associated_function = mapper.get(t.type, False)
if associated_function:
kwargs = json.loads(t.arguments)
associated_function(**kwargs)
db.session.delete(t)
db.session.commit()
except Exception as e:
# Send Email when failed
exception_str = traceback.format_exc()
mail_sender.send('[email protected]', '[email protected]',
'[FAILURE] Task Scheduler: task %s' % t.type,
'<pre>%s</pre>' % exception_str)
@manager.command
def execute_tasks():
execute_pending_tasks()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment