Last active
April 24, 2022 11:19
-
-
Save davisagli/5824662 to your computer and use it in GitHub Desktop.
Running Zope code as a celery task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from AccessControl.SecurityManagement import newSecurityManager | |
from AccessControl.SecurityManagement import noSecurityManager | |
from Testing.makerequest import makerequest | |
from ZODB.POSException import ConflictError | |
from celery import Celery, Task | |
from email.Header import Header | |
from zope.app.publication.interfaces import BeforeTraverseEvent | |
from zope.component.hooks import setSite | |
from zope.event import notify | |
import email | |
import os | |
import sys | |
import transaction | |
import Zope2 | |
celery = Celery( | |
'myproject.tasks', | |
broker='amqp://localhost//', | |
backend='amqp://localhost//', | |
) | |
class AfterCommitTask(Task): | |
"""Base for tasks that queue themselves after commit. | |
This is intended for tasks scheduled from inside Zope. | |
""" | |
abstract = True | |
# Override apply_async to register an after-commit hook | |
# instead of queueing the task right away. | |
def apply_async(self, *args, **kw): | |
def hook(success): | |
if success: | |
super(AfterCommitTask, self).apply_async(*args, **kw) | |
transaction.get().addAfterCommitHook(hook) | |
# apply_async normally returns a deferred result object, | |
# but we don't have one available yet | |
def zope_task(**task_kw): | |
"""Decorator of celery tasks that should be run in a Zope context. | |
The decorator function takes a path as a first argument, | |
and will take care of traversing to it and passing it | |
(presumably a portal) as the first argument to the decorated function. | |
Also takes care of initializing the Zope environment, | |
running the task within a transaction, and retrying on | |
ZODB conflict errors. | |
""" | |
def wrap(func): | |
def new_func(*args, **kw): | |
site_path = kw.get('site_path', 'Plone') | |
site_path = site_path.strip().strip('/') | |
# This is a super ugly way of getting Zope to configure itself | |
# from the main instance's zope.conf. XXX FIXME | |
sys.argv = [''] | |
os.environ['ZOPE_CONFIG'] = 'parts/client1/etc/zope.conf' | |
app = makerequest(Zope2.app()) | |
transaction.begin() | |
try: | |
try: | |
# find site | |
site = app.unrestrictedTraverse(site_path) | |
# fire traversal event so various things get set up | |
notify(BeforeTraverseEvent(site, site.REQUEST)) | |
# set up admin user | |
user = app.acl_users.getUserById('admin') | |
newSecurityManager(None, user) | |
# run the task | |
result = func(site, *args, **kw) | |
# commit transaction | |
transaction.commit() | |
except ConflictError, e: | |
# On ZODB conflicts, retry using celery's mechanism | |
transaction.abort() | |
raise new_func.retry(exc=e) | |
except: | |
transaction.abort() | |
raise | |
finally: | |
noSecurityManager() | |
setSite(None) | |
app._p_jar.close() | |
return result | |
new_func.__name__ = func.__name__ | |
return celery.task(base=AfterCommitTask, **task_kw)(new_func) | |
return wrap | |
@zope_task() | |
def run_view(portal, view_path): | |
view = portal.restrictedTraverse(view_path) | |
view() | |
@zope_task() | |
def send_mail(portal, subject, message, mfrom, mto): | |
if isinstance(message, unicode): | |
message = message.encode('utf8') | |
msg = email.message_from_string(message) | |
msg.set_charset('utf-8') | |
msg['Reply-To'] = Header(mfrom, 'utf-8') | |
mfrom = email.utils.formataddr((portal.email_from_name, portal.email_from_address)) | |
mailhost = portal.MailHost | |
mailhost.send( | |
msg, subject=subject, mfrom=mfrom, mto=mto, | |
immediate=True, charset='utf-8') | |
# Examples of calling: | |
run_view.defer('@@foo') | |
send_mail.defer(u'Test message', u'test', u'[email protected]', u'[email protected]') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
cool I will test that ! Thanks