Skip to content

Instantly share code, notes, and snippets.

@somic
Created April 27, 2009 17:30
Show Gist options
  • Save somic/102602 to your computer and use it in GitHub Desktop.
Save somic/102602 to your computer and use it in GitHub Desktop.
a way to emulate rabbitmqctl from python
#!/usr/bin/env python
# requires py_interface
# http://www.lysator.liu.se/~tab/erlang/py_interface/
import sys, os, time
from py_interface import erl_node, erl_eventhandler
from py_interface.erl_opts import ErlNodeOpts
from py_interface.erl_term import ErlAtom, ErlBinary
#from py_interface import erl_common
#erl_common.DebugOnAll()
COOKIE = 'PQDZQNHHATKKMBYYBHSS'
def __msg_handler_list_bindings(msg):
for entry in msg:
exchange, queue, rkey, l = entry
print "Exchange: %s" % exchange[3].contents
print "Queue: %s" % queue[3].contents
print "Routing key: %s" % rkey.contents
print
erl_eventhandler.GetEventHandler().StopLooping()
def start_pyrabbitmqctl_node():
node = erl_node.ErlNode("pyrabbitmqctl%d" % os.getpid(),
ErlNodeOpts(cookie=COOKIE))
#node.Publish()
mbox = node.CreateMBox(None)
mbox.RegisterName("p")
return node, mbox
def rpc_list_bindings(mbox, vhost="/"):
mbox.SendRPC(
ErlAtom('rabbit@home'),
ErlAtom('rabbit_exchange'),
ErlAtom('list_bindings'),
[ ErlBinary(vhost) ],
__msg_handler_list_bindings
)
node, mbox = start_pyrabbitmqctl_node()
rpc_list_bindings(mbox)
erl_eventhandler.GetEventHandler().Loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment