Last active
July 9, 2024 14:02
-
-
Save davisagli/5824709 to your computer and use it in GitHub Desktop.
Calling Salesforce as a celery task, from within a Zope transaction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery, Task | |
from requests.exceptions import ConnectionError | |
from simple_salesforce import Salesforce | |
import transaction | |
celery = Celery( | |
'path.to.this.module', | |
broker='amqp://localhost//', | |
backend='amqp://localhost//', | |
) | |
class AfterCommitTask(Task): | |
"""Base for tasks that queue themselves after commit. | |
Note: Tasks may run in parallel if you have celery's concurrency set > 1. | |
So if you need things to happen in the order they get queued, either | |
set CELERYD_CONCURRENCY = 1 or modify this to create a *chain* of tasks | |
for each transaction. | |
This is intended for tasks scheduled from inside Zope. | |
""" | |
abstract = True | |
# Override apply_async to register an after-commit hook | |
# instead of queueing the task right away. | |
def apply_async(self, *args, **kw): | |
def hook(success): | |
if success: | |
super(AfterCommitTask, self).apply_async(*args, **kw) | |
transaction.get().addAfterCommitHook(hook) | |
# apply_async normally returns a deferred result object, | |
# but we don't have one available yet | |
def salesforce_task(func): | |
"""Decorator to help write tasks that call out to Salesforce. | |
Takes care of instantiating a simple-salesforce client and | |
passing it in as the first argument of the decorated function. | |
Also takes care of catching and retrying on ConnectionErrors. | |
""" | |
def new_func(*args, **kw): | |
try: | |
sf = Salesforce( | |
username='FIXME', | |
password='FIXME', | |
security_token='FIXME', | |
sandbox=True, | |
) | |
res = func(sf, *args, **kw) | |
except ConnectionError, e: | |
# Handle transient network errors by telling | |
# celery to retry the job | |
raise new_func.retry(exc=e) | |
return res | |
new_func.__name__ = func.__name__ | |
return celery.task(base=AfterCommitTask)(new_func) | |
# Example of use... | |
# 1. Defining a task: | |
@salesforce_task | |
def do_upsert(sf, sobject_type, record_id, data): | |
"""Upsert an object to Salesforce using the REST API. | |
This should be called as a deferred task. | |
""" | |
sobject = getattr(sf, sobject_type) | |
return sobject.upsert(record_id, data) | |
# 2. Using the task (from within a transaction somewhere): | |
do_upsert.delay('Contact', '1234', {'FirstName': 'Harvey'}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment