Created
January 10, 2013 17:42
-
-
Save danielrobbins/4504129 to your computer and use it in GitHub Desktop.
Fixes: ZEN-3758
Unable to run zeneventd under multiple threads in Zenoss Core, causes major RabbitMQ rawevents queue bottleneck!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml | |
=================================================================== | |
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml (revision 52897) | |
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml (revision 67823) | |
@@ -9,5 +9,9 @@ | |
<include package=".browser"/> | |
- <subscriber handler=".zeneventd.onDaemonStartRun"/> | |
+ <subscriber handler=".zeneventdEvents.onSigTerm"/> | |
+ <subscriber handler=".zeneventdEvents.onSigUsr1"/> | |
+ <subscriber handler=".zeneventdEvents.onBuildOptions"/> | |
+ <subscriber handler=".zeneventdEvents.onDaemonCreated"/> | |
+ <subscriber handler=".zeneventdEvents.onDaemonStartRun"/> | |
</configure> | |
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py | |
=================================================================== | |
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py (revision 67823) | |
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py (revision 67823) | |
@@ -0,0 +1,103 @@ | |
+############################################################################## | |
+# | |
+# Copyright (C) Zenoss, Inc. 2011, all rights reserved. | |
+# | |
+# This content is made available according to terms specified in | |
+# License.zenoss under the directory where your Zenoss product is installed. | |
+# | |
+############################################################################## | |
+ | |
+ | |
+import logging | |
+import signal | |
+import socket | |
+import time | |
+from amqplib.client_0_8.exceptions import AMQPConnectionException | |
+from zope.component import getUtility | |
+from Products.ZenEvents.zeneventd import BaseQueueConsumerTask, EventPipelineProcessor | |
+from Products.ZenEvents.zeneventd import QUEUE_RAW_ZEN_EVENTS | |
+from Products.ZenMessaging.queuemessaging.eventlet import BasePubSubMessageTask | |
+from Products.ZenUtils.ZCmdBase import ZCmdBase | |
+from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema | |
+from zenoss.protocols.jsonformat import to_dict | |
+from zenoss.protocols.eventlet.amqp import Publishable, getProtobufPubSub | |
+from Products.ZenCollector.utils.workers import workersBuildOptions | |
+ | |
+log = logging.getLogger("zen.eventd") | |
+ | |
+class EventletQueueConsumerTask(BaseQueueConsumerTask, BasePubSubMessageTask): | |
+ | |
+ def __init__(self, processor): | |
+ BaseQueueConsumerTask.__init__(self, processor) | |
+ | |
+ def processMessage(self, message): | |
+ """ | |
+ Handles a queue message, can call "acknowledge" on the Queue Consumer | |
+ class when it is done with the message | |
+ """ | |
+ zepRawEvent = self.processor.processMessage(message) | |
+ | |
+ if log.isEnabledFor(logging.DEBUG): | |
+ log.debug("Publishing event: %s", to_dict(zepRawEvent)) | |
+ | |
+ yield Publishable(zepRawEvent, exchange=self._dest_exchange, | |
+ routingKey=self._routing_key(zepRawEvent)) | |
+ | |
+class EventDEventletWorker(ZCmdBase): | |
+ | |
+ mname = 'ZenEventD' # For logging | |
+ | |
+ def __init__(self): | |
+ super(EventDEventletWorker, self).__init__() | |
+ self._amqpConnectionInfo = getUtility(IAMQPConnectionInfo) | |
+ self._queueSchema = getUtility(IQueueSchema) | |
+ | |
+ def run(self): | |
+ self._shutdown = False | |
+ signal.signal(signal.SIGTERM, self._sigterm) | |
+ task = EventletQueueConsumerTask(EventPipelineProcessor(self.dmd)) | |
+ self._listen(task) | |
+ | |
+ def shutdown(self): | |
+ self._shutdown = True | |
+ if self._pubsub: | |
+ self._pubsub.shutdown() | |
+ self._pubsub = None | |
+ | |
+ def buildOptions(self): | |
+ super(EventDEventletWorker, self).buildOptions() | |
+ # don't comment out the workers option in zeneventd.conf (ZEN-2769) | |
+ workersBuildOptions(self.parser) | |
+ | |
+ def _sigterm(self, signum=None, frame=None): | |
+ log.debug("worker sigterm...") | |
+ self.shutdown() | |
+ | |
+ def _listen(self, task, retry_wait=30): | |
+ self._pubsub = None | |
+ keepTrying = True | |
+ sleep = 0 | |
+ while keepTrying and not self._shutdown: | |
+ try: | |
+ if sleep: | |
+ log.info("Waiting %s seconds to reconnect..." % sleep) | |
+ time.sleep(sleep) | |
+ sleep = min(retry_wait, sleep * 2) | |
+ else: | |
+ sleep = .1 | |
+ log.info("Connecting to RabbitMQ...") | |
+ self._pubsub = getProtobufPubSub(self._amqpConnectionInfo, self._queueSchema, QUEUE_RAW_ZEN_EVENTS) | |
+ self._pubsub.registerHandler('$Event', task) | |
+ self._pubsub.registerExchange('$ZepZenEvents') | |
+ #reset sleep time | |
+ sleep=0 | |
+ self._pubsub.run() | |
+ except (socket.error, AMQPConnectionException) as e: | |
+ log.warn("RabbitMQ Connection error %s" % e) | |
+ except KeyboardInterrupt: | |
+ keepTrying = False | |
+ finally: | |
+ if self._pubsub: | |
+ self._pubsub.shutdown() | |
+ self._pubsub = None | |
+ | |
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py | |
=================================================================== | |
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py (revision 61801) | |
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py (revision 67823) | |
@@ -17,4 +17,5 @@ | |
import Globals | |
from zope.component import getUtility, adapter | |
+ | |
from zope.interface import implements | |
from zope.component.event import objectEventNotify | |
@@ -265,10 +266,4 @@ | |
-@adapter(ZenEventD, DaemonStartRunEvent) | |
-def onDaemonStartRun(daemon, event): | |
- """ | |
- Start up an EventDWorker. | |
- """ | |
- EventDTwistedWorker(daemon.dmd).run() | |
if __name__ == '__main__': | |
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py | |
=================================================================== | |
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py (revision 67823) | |
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py (revision 67823) | |
@@ -0,0 +1,58 @@ | |
+############################################################################## | |
+# | |
+# Copyright (C) Zenoss, Inc. 2011, all rights reserved. | |
+# | |
+# This content is made available according to terms specified in | |
+# License.zenoss under the directory where your Zenoss product is installed. | |
+# | |
+############################################################################## | |
+ | |
+ | |
+import pkg_resources | |
+from zenoss.protocols.eventlet.amqp import register_eventlet | |
+from twisted.internet import reactor | |
+from zope.component import adapter, getGlobalSiteManager | |
+from Products.ZenEvents.zeneventd import ZenEventD | |
+from Products.ZenEvents.daemonlifecycle import DaemonCreatedEvent, DaemonStartRunEvent | |
+from Products.ZenEvents.daemonlifecycle import SigTermEvent, SigUsr1Event, BuildOptionsEvent | |
+from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker | |
+ | |
+@adapter(ZenEventD, SigTermEvent) | |
+def onSigTerm(daemon, event): | |
+ if daemon.options.daemon: | |
+ daemon._workers.shutdown() | |
+ | |
+@adapter(ZenEventD, SigUsr1Event) | |
+def onSigUsr1(daemon, event): | |
+ if daemon.options.daemon: | |
+ daemon._workers.sendSignal(event.signum) | |
+ | |
+@adapter(ZenEventD, BuildOptionsEvent) | |
+def onBuildOptions(daemon, event): | |
+ workersBuildOptions(daemon.parser, default=2) | |
+ | |
+@adapter(ZenEventD, DaemonCreatedEvent) | |
+def onDaemonCreated(daemon, event): | |
+ """ | |
+ Called at the end of zeneventd's constructor. | |
+ """ | |
+ register_eventlet() | |
+ if daemon.options.daemon: | |
+ daemon._workers = ProcessWorkers(daemon.options.workers, exec_worker, "Event worker") | |
+ | |
+@adapter(ZenEventD, DaemonStartRunEvent) | |
+def onDaemonStartRun(daemon, event): | |
+ """ | |
+ Called when the daemon is ready to begin processing. This handler replaces the one | |
+ defined in zeneventd.py, because onDaemonCreated (above) removes it | |
+ """ | |
+ from .zeneventdWorkers import EventDEventletWorker | |
+ # Free up unnecessary database resources in parent zeneventd process | |
+ if daemon.options.daemon: | |
+ daemon.closedb() | |
+ daemon.closeAll() | |
+ daemon._workers.startWorkers() | |
+ reactor.run() | |
+ else: | |
+ worker = EventDEventletWorker() | |
+ worker.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment