Skip to content

Instantly share code, notes, and snippets.

@habibutsu
Last active August 29, 2015 13:57
Show Gist options
  • Save habibutsu/9620373 to your computer and use it in GitHub Desktop.
Save habibutsu/9620373 to your computer and use it in GitHub Desktop.
Amqp proxy
import pika
import logging
import pickle
import json
AMQPS = {
'test': {
'host': '127.0.0.1',
'port': 5672,
'userid': 'test',
'password': 'test',
'virtual_host': '/test',
'insist': False,
},
'test_proxy': {
'host': '127.0.0.1',
'port': 9999,
'userid': 'test',
'password': 'test',
'virtual_host': '/test',
'insist': False,
},
}
# Open a connection to RabbitMQ on localhost using all default parameters
connection_name = 'test_proxy'
parameters = pika.ConnectionParameters(
host=AMQPS[connection_name]['host'],
port=AMQPS[connection_name]['port'],
virtual_host=AMQPS[connection_name]['virtual_host'],
credentials=pika.credentials.PlainCredentials(
AMQPS[connection_name]['userid'], AMQPS[connection_name]['password'])
)
connection = pika.BlockingConnection(parameters)
def callback_timeout_connection():
print "Timeout"
connection.add_timeout(5, callback_timeout_connection)
channel = connection.channel()
# channel.queue_declare(
# "prebattle_request", durable=True, arguments={"x-message-ttl": 3600000})
# channel.queue_declare(
# "test_response", durable=True, arguments={"x-message-ttl": 3600000})
# channel.queue_bind(
# "prebattle_request",
# "bigworld",
# routing_key="bigworld.prebattle_request")
# channel.queue_bind(
# "test_response",
# "bigworld",
# routing_key="bigworld.prebattle_response")
channel.confirm_delivery()
# Send a message
data = {'key1': 'value1'}
headers = {
"type": "Test",
}
result = channel.basic_publish(
exchange='test_exchange',
routing_key='test_routing',
body=json.dumps(data),
properties=pika.BasicProperties(
content_type='application/x-pickle',
delivery_mode=2,
headers=headers))
if result:
print 'Message publish was confirmed'
else:
print 'Message could not be confirmed'
# def _cbConsume(channel, method_frame, header_frame, body):
# print body
# channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# check_connection(connection, channel)
# print "start consuming..."
# channel.basic_consume(
# consumer_callback=_cbConsume, queue="test_response"
# )
# print "start consuming infinity..."
# try:
# channel.start_consuming()
# except KeyboardInterrupt:
# channel.stop_consuming()
connection.close()
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from select import select
import logging
from cStringIO import StringIO
from struct import unpack
stream_handler = logging.StreamHandler()
logger = logging.getLogger('proxy')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
BUFFER_SIZE = 4096
class Client(object):
def __init__(self, socket, addr, forward=None):
self.socket = socket
self.addr = addr
self.buffer = StringIO()
self.to_close = False
self.forward = forward
def forward_connect(self, forward_host, forward_port):
sock = socket(AF_INET, SOCK_STREAM)
sock.connect((forward_host, forward_port))
self.forward = Client(sock, (forward_host, forward_port), self)
return self.forward
def close(self):
if not self.buffer.tell():
logger.debug(u"Close connection %s", self)
self.socket.close()
return True
self.to_close = True
return False
def __unicode__(self):
return u"%s:%s" % self.addr
class Server(object):
clients = {}
rlist = []
wlist = []
_backlog = 20 # maximum number of queued connections
_timeout = 0.5
def __init__(self, host='127.0.0.1', port=9999, forward_host='127.0.0.1', forward_port=5672):
self.host = host
self.port = port
self.forward_host = forward_host
self.forward_port = forward_port
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(self._backlog)
self.rlist.append(self.socket)
def accept_client(self):
sock, addr = self.socket.accept()
client = Client(sock, addr)
logger.debug(u"Accept connection: %s", client)
self.add_client(client)
forward = client.forward_connect(self.forward_host, self.forward_port)
logger.debug(u"New connection: %s", forward)
self.add_client(forward)
def add_client(self, client):
self.clients[client.socket] = client
self.rlist.append(client.socket)
def remove_client(self, socket):
del self.clients[socket]
self.rlist.remove(socket)
def close_client(self, socket):
self.rlist.remove(socket)
client = self.clients[socket]
if client.close():
del self.clients[socket]
if client.forward.close():
del self.clients[client.forward.socket]
self.rlist.remove(client.forward.socket)
def run_forever(self):
logger.info(u"Run server.. %s:%s", self.host, self.port)
while True:
r, w, e = select(self.rlist, self.wlist, [], self._timeout)
for sock in r:
if sock == self.socket:
self.accept_client()
continue
data = sock.recv(BUFFER_SIZE)
if not data:
self.close_client(sock)
continue
self.forwarding(sock, data)
for sock in w:
client = self.clients[sock]
if client.buffer.tell():
sock.sendall(client.buffer.getvalue())
client.buffer.truncate(0)
self.wlist.remove(sock)
if client.to_close:
client.close()
self.remove_client(sock)
else:
raise SystemError
def forwarding(self, socket, data):
client = self.clients[socket]
logger.debug(u"%s -> %s", client, client.forward)
if client.addr[1] == 5672 or client.forward.addr[1] == 5672:
logger.debug(u"%s\n", amqp_repr(data))
else:
logger.debug(u"%s\n", repr(data))
client.forward.buffer.write(data)
if client.forward.socket not in self.wlist:
self.wlist.append(client.forward.socket)
METHOD_NAME_MAP_0_8 = {
(10, 10): 'Connection.start',
(10, 11): 'Connection.start_ok',
(10, 20): 'Connection.secure',
(10, 21): 'Connection.secure_ok',
(10, 30): 'Connection.tune',
(10, 31): 'Connection.tune_ok',
(10, 40): 'Connection.open',
(10, 41): 'Connection.open_ok',
(10, 50): 'Connection.redirect',
(10, 60): 'Connection.close',
(10, 61): 'Connection.close_ok',
(20, 10): 'Channel.open',
(20, 11): 'Channel.open_ok',
(20, 20): 'Channel.flow',
(20, 21): 'Channel.flow_ok',
(20, 30): 'Channel.alert',
(20, 40): 'Channel.close',
(20, 41): 'Channel.close_ok',
(30, 10): 'Channel.access_request',
(30, 11): 'Channel.access_request_ok',
(40, 10): 'Channel.exchange_declare',
(40, 11): 'Channel.exchange_declare_ok',
(40, 20): 'Channel.exchange_delete',
(40, 21): 'Channel.exchange_delete_ok',
(50, 10): 'Channel.queue_declare',
(50, 11): 'Channel.queue_declare_ok',
(50, 20): 'Channel.queue_bind',
(50, 21): 'Channel.queue_bind_ok',
(50, 30): 'Channel.queue_purge',
(50, 31): 'Channel.queue_purge_ok',
(50, 40): 'Channel.queue_delete',
(50, 41): 'Channel.queue_delete_ok',
(60, 10): 'Channel.basic_qos',
(60, 11): 'Channel.basic_qos_ok',
(60, 20): 'Channel.basic_consume',
(60, 21): 'Channel.basic_consume_ok',
(60, 30): 'Channel.basic_cancel',
(60, 31): 'Channel.basic_cancel_ok',
(60, 40): 'Channel.basic_publish',
(60, 50): 'Channel.basic_return',
(60, 60): 'Channel.basic_deliver',
(60, 70): 'Channel.basic_get',
(60, 71): 'Channel.basic_get_ok',
(60, 72): 'Channel.basic_get_empty',
(60, 80): 'Channel.basic_ack',
(60, 90): 'Channel.basic_reject',
(60, 100): 'Channel.basic_recover',
(90, 10): 'Channel.tx_select',
(90, 11): 'Channel.tx_select_ok',
(90, 20): 'Channel.tx_commit',
(90, 21): 'Channel.tx_commit_ok',
(90, 30): 'Channel.tx_rollback',
(90, 31): 'Channel.tx_rollback_ok',
}
def amqp_repr(data):
result = "AMQP: "
if len(data) == 8:
result += "Header {2}-{3}".format(*unpack('<4B', data[4:]))
return result
frame_type, channel, size = unpack('>BHI', data[:7])
payload = data[7:7 + size]
method_sig = unpack('>HH', payload[:4])
method_name = METHOD_NAME_MAP_0_8.get(method_sig)
result += "frame type: %s, channel: %s, method_sig: %s" % (frame_type, channel, method_name)
return result
#return repr(data)
if __name__ == '__main__':
server = Server()
server.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment