Skip to content

Instantly share code, notes, and snippets.

@jldupont
Created January 28, 2010 16:30
Show Gist options
  • Save jldupont/288887 to your computer and use it in GitHub Desktop.
Save jldupont/288887 to your computer and use it in GitHub Desktop.
Simple publish/subscribe Message Bus
"""
Simple "Bus" based message publish/subscribe
Created on 2010-01-28
@author: jldupont
"""
__all__=["Bus"]
class Bus(object):
"""
Simple publish/subscribe "message bus"
with configurable error reporting
Message delivery is "synchronous i.e. the caller of
the "publish" method blocks until all the subscribers
of the message have been "called-back".
Any "callback" can return "True" for stopping the
calling chain.
"""
logger=None
ftable={}
@classmethod
def subscribe(cls, msgType, callback):
"""
Subscribe to a Message Type
@param msgType: string, unique message-type identifier
@param callback: callable instance which will be called upon message delivery
"""
subs=cls.ftable.get(msgType, [])
subs.append(callback)
cls.ftable[msgType]=subs
@classmethod
def publish(cls, msgType, *pa, **kwa):
"""
Publish a message from a specific type on the bus
@param msgType: string, message-type
@param *pa: positional arguments
@param **kwa: keyword based arguments
"""
subs=cls.ftable.get(msgType, [])
for sub_cb in subs:
try:
stop_chain=sub_cb(msgType, *pa, **kwa)
except Exception, e:
stop_chain=True
if cls.logger:
cls.logger(msgType, e)
if stop_chain:
return
## =========================================================== TESTS
if __name__ == '__main__':
class Logger(object):
def __call__(self, msgType, e):
print "Logger: Error: msgType(%s) Exception(%s)" % (msgType, str(e))
def callback(msgType, *pa, **kwa ):
print "cb: msgType(%s) pa(%s) kwa(%s)" % (msgType, pa, kwa)
Bus.logger=Logger()
Bus.subscribe("test", callback)
Bus.subscribe("test", callback)
Bus.publish("test", p1="v1", p2="v2")
Bus.publish("test2", p1="v1", p2="v2")
Bus.subscribe("test", None)
Bus.publish("test", p2="v2", p3="v3")
Bus.logger=None
Bus.publish("test", p2="v2", p3="v3")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment