Skip to content

Instantly share code, notes, and snippets.

@glensc
Last active July 24, 2017 06:25
Show Gist options
  • Save glensc/78b83d543eddc28dd9798fa882b539a1 to your computer and use it in GitHub Desktop.
Save glensc/78b83d543eddc28dd9798fa882b539a1 to your computer and use it in GitHub Desktop.
fedmsg and reactor.callLater issue
*~
*.py[co]
/*.egg-info/

fedmsg and reactor.callLater issue

Given such FedMsg Consumer code:

    def consume(self, msg):
        msg = msg['body']
        self.log.info("CVS: Got a message %r" % msg['topic'])

        def delayed_consume():
            self.log.debug("delayed_consume")

        moksha.hub.reactor.reactor.callLater(1.0, delayed_consume)

Then the delayed_consume is not always called out.

It can be forced to be invoked when another fedmsg message is sent what this consumer listens for.

Development

# Copy the cvs_consumer_config.py into ./fedmsg.d/ directory.
# For production copy to /etc/fedmsg.d directory.
cp my_consumer_config.py fedmsg.d

# Setup your consumer by running
python setup.py develop

# Start the fedmsg-hub (which should pick up your consumer) with:
fedmsg-hub

Testing

Start the #development, and then send first message to bus:

echo '{"foo": "bar"}' | fedmsg-logger --json-input

You should see delayed_consume message in hub logs after 1 second of each message.

"""
Consumer to show how to write a service that does stuff in
response to message on the `fedmsg bus <http://fedmsg.rtfd.org>`_.
"""
import fedmsg
import fedmsg.consumers
from moksha.hub.reactor import reactor
from twisted.internet import task
class MyConsumer(fedmsg.consumers.FedmsgConsumer):
topic = "net.ed.prod.logger.log"
config_key = 'my.consumer.enabled'
callId = None
def consume(self, msg):
msg = msg['body']
self.log.info("my_consumer: message %r" % msg['topic'])
def delayed_consume():
self.log.info("my_consumer: reset %r" % self.callId)
self.callId = None
self.log.info("delayed_consume")
if self.callId:
self.log.info("my_consumer: cancel %r" % self.callId)
# cancel previous one before adding another one
self.callId.cancel()
self.callId = None
self.callId = reactor.callLater(1.0, delayed_consume)
self.log.info("my_consumer: created: %r" % self.callId)
"""
This is a config file that must appear in ./fedmsg.d/ alongside the other
config files from the fedmsg development checkout.
In production, this should go into /etc/fedmsg.d/ but for development it can
just live in your cwd/pwd.
"""
config = {
# whether the consumer is enabled
'my.consumer.enabled': True,
}
"""
A setuptool installer that crucially declares the consumer on an entry-point
that moksha is looking for.
Without this, fedmsg-hub won't find your consumer.
"""
from setuptools import setup
setup(
name='fedmsg-my',
entry_points="""
[moksha.consumer]
my = my_consumer:MyConsumer
""",
)
@glensc
Copy link
Author

glensc commented Jul 24, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment