Skip to content

Instantly share code, notes, and snippets.

@malinoff
Created April 1, 2014 02:32
Show Gist options
  • Save malinoff/9906630 to your computer and use it in GitHub Desktop.
Save malinoff/9906630 to your computer and use it in GitHub Desktop.
This gist describes how to run handler function in a separate process to be able to receive new publications
import time
import signal
import multiprocessing
from kombu.pidbox import Node, Mailbox
from kombu.clocks import LamportClock
from kombu.exceptions import InconsistencyError
from kombu import Connection, Exchange, Producer
def do_reply(reply, exchange, routing_key, ticket, channel, clock):
exchange = Exchange(exchange, exchange_type='direct',
delivery_mode='transient',
durable=False)
producer = Producer(channel, auto_declare=False)
try:
producer.publish(
reply, exchange=exchange,
routing_key=routing_key,
declare=[exchange],
headers={'ticket': ticket, 'clock': clock},
)
except InconsistencyError:
pass
def dispatch(handler, arguments=None,
reply_to=None, ticket=None,
hostname=None, channel=None, clock=None, state=None):
arguments = arguments or {}
arguments.update({'state': state})
print('pidbox received method {0} [reply_to:{1} ticket:{2}]'.format(
handler.__name__, reply_to, ticket))
try:
reply = handler(**arguments)
except SystemExit:
raise
except Exception as exc:
print('pidbox command error: %r' % exc)
reply = {'error': repr(exc)}
if reply_to:
do_reply({hostname: reply},
exchange=reply_to['exchange'],
routing_key=reply_to['routing_key'],
ticket=ticket,
channel=channel(),
clock=clock)
return reply
class MyNode(Node):
def handle_message(self, body, message=None):
destination = body.get('destination')
if message:
self.adjust_clock(message.headers.get('clock') or 0)
if not destination or self.hostname in destination:
handler = self.handlers[body['method']]
pool.apply_async(
dispatch,
kwds={'state': self.state,
'handler': handler,
'arguments': body['arguments'],
'reply_to': body['reply_to'],
'ticket': body['ticket'],
'hostname': self.hostname,
'channel': self.mailbox.connection.channel,
'clock': self.mailbox.clock.forward()}
)
c = Connection()
m = multiprocessing.Manager()
clock = LamportClock(Lock=m.Lock)
mailbox = Mailbox('testmail', type='fanout', connection=c, clock=clock)
node = MyNode('mynode', mailbox=mailbox)
@node.handler
def test(state, x, **kwargs):
print('In state function')
time.sleep(x)
print('After sleep')
return 'done {0}'.format(x)
def handle_cc():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = multiprocessing.Pool(4, handle_cc)
consumer = node.listen(c)
try:
while True:
c.drain_events()
except KeyboardInterrupt:
print('\nbye')
finally:
pool.terminate()
consumer.cancel()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment