Last active
August 29, 2015 13:57
-
-
Save habibutsu/9620373 to your computer and use it in GitHub Desktop.
Amqp proxy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
import logging | |
import pickle | |
import json | |
AMQPS = { | |
'test': { | |
'host': '127.0.0.1', | |
'port': 5672, | |
'userid': 'test', | |
'password': 'test', | |
'virtual_host': '/test', | |
'insist': False, | |
}, | |
'test_proxy': { | |
'host': '127.0.0.1', | |
'port': 9999, | |
'userid': 'test', | |
'password': 'test', | |
'virtual_host': '/test', | |
'insist': False, | |
}, | |
} | |
# Open a connection to RabbitMQ on localhost using all default parameters | |
connection_name = 'test_proxy' | |
parameters = pika.ConnectionParameters( | |
host=AMQPS[connection_name]['host'], | |
port=AMQPS[connection_name]['port'], | |
virtual_host=AMQPS[connection_name]['virtual_host'], | |
credentials=pika.credentials.PlainCredentials( | |
AMQPS[connection_name]['userid'], AMQPS[connection_name]['password']) | |
) | |
connection = pika.BlockingConnection(parameters) | |
def callback_timeout_connection(): | |
print "Timeout" | |
connection.add_timeout(5, callback_timeout_connection) | |
channel = connection.channel() | |
# channel.queue_declare( | |
# "prebattle_request", durable=True, arguments={"x-message-ttl": 3600000}) | |
# channel.queue_declare( | |
# "test_response", durable=True, arguments={"x-message-ttl": 3600000}) | |
# channel.queue_bind( | |
# "prebattle_request", | |
# "bigworld", | |
# routing_key="bigworld.prebattle_request") | |
# channel.queue_bind( | |
# "test_response", | |
# "bigworld", | |
# routing_key="bigworld.prebattle_response") | |
channel.confirm_delivery() | |
# Send a message | |
data = {'key1': 'value1'} | |
headers = { | |
"type": "Test", | |
} | |
result = channel.basic_publish( | |
exchange='test_exchange', | |
routing_key='test_routing', | |
body=json.dumps(data), | |
properties=pika.BasicProperties( | |
content_type='application/x-pickle', | |
delivery_mode=2, | |
headers=headers)) | |
if result: | |
print 'Message publish was confirmed' | |
else: | |
print 'Message could not be confirmed' | |
# def _cbConsume(channel, method_frame, header_frame, body): | |
# print body | |
# channel.basic_ack(delivery_tag=method_frame.delivery_tag) | |
# check_connection(connection, channel) | |
# print "start consuming..." | |
# channel.basic_consume( | |
# consumer_callback=_cbConsume, queue="test_response" | |
# ) | |
# print "start consuming infinity..." | |
# try: | |
# channel.start_consuming() | |
# except KeyboardInterrupt: | |
# channel.stop_consuming() | |
connection.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR | |
from select import select | |
import logging | |
from cStringIO import StringIO | |
from struct import unpack | |
stream_handler = logging.StreamHandler() | |
logger = logging.getLogger('proxy') | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(stream_handler) | |
BUFFER_SIZE = 4096 | |
class Client(object): | |
def __init__(self, socket, addr, forward=None): | |
self.socket = socket | |
self.addr = addr | |
self.buffer = StringIO() | |
self.to_close = False | |
self.forward = forward | |
def forward_connect(self, forward_host, forward_port): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.connect((forward_host, forward_port)) | |
self.forward = Client(sock, (forward_host, forward_port), self) | |
return self.forward | |
def close(self): | |
if not self.buffer.tell(): | |
logger.debug(u"Close connection %s", self) | |
self.socket.close() | |
return True | |
self.to_close = True | |
return False | |
def __unicode__(self): | |
return u"%s:%s" % self.addr | |
class Server(object): | |
clients = {} | |
rlist = [] | |
wlist = [] | |
_backlog = 20 # maximum number of queued connections | |
_timeout = 0.5 | |
def __init__(self, host='127.0.0.1', port=9999, forward_host='127.0.0.1', forward_port=5672): | |
self.host = host | |
self.port = port | |
self.forward_host = forward_host | |
self.forward_port = forward_port | |
self.socket = socket(AF_INET, SOCK_STREAM) | |
self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
self.socket.bind((self.host, self.port)) | |
self.socket.listen(self._backlog) | |
self.rlist.append(self.socket) | |
def accept_client(self): | |
sock, addr = self.socket.accept() | |
client = Client(sock, addr) | |
logger.debug(u"Accept connection: %s", client) | |
self.add_client(client) | |
forward = client.forward_connect(self.forward_host, self.forward_port) | |
logger.debug(u"New connection: %s", forward) | |
self.add_client(forward) | |
def add_client(self, client): | |
self.clients[client.socket] = client | |
self.rlist.append(client.socket) | |
def remove_client(self, socket): | |
del self.clients[socket] | |
self.rlist.remove(socket) | |
def close_client(self, socket): | |
self.rlist.remove(socket) | |
client = self.clients[socket] | |
if client.close(): | |
del self.clients[socket] | |
if client.forward.close(): | |
del self.clients[client.forward.socket] | |
self.rlist.remove(client.forward.socket) | |
def run_forever(self): | |
logger.info(u"Run server.. %s:%s", self.host, self.port) | |
while True: | |
r, w, e = select(self.rlist, self.wlist, [], self._timeout) | |
for sock in r: | |
if sock == self.socket: | |
self.accept_client() | |
continue | |
data = sock.recv(BUFFER_SIZE) | |
if not data: | |
self.close_client(sock) | |
continue | |
self.forwarding(sock, data) | |
for sock in w: | |
client = self.clients[sock] | |
if client.buffer.tell(): | |
sock.sendall(client.buffer.getvalue()) | |
client.buffer.truncate(0) | |
self.wlist.remove(sock) | |
if client.to_close: | |
client.close() | |
self.remove_client(sock) | |
else: | |
raise SystemError | |
def forwarding(self, socket, data): | |
client = self.clients[socket] | |
logger.debug(u"%s -> %s", client, client.forward) | |
if client.addr[1] == 5672 or client.forward.addr[1] == 5672: | |
logger.debug(u"%s\n", amqp_repr(data)) | |
else: | |
logger.debug(u"%s\n", repr(data)) | |
client.forward.buffer.write(data) | |
if client.forward.socket not in self.wlist: | |
self.wlist.append(client.forward.socket) | |
METHOD_NAME_MAP_0_8 = { | |
(10, 10): 'Connection.start', | |
(10, 11): 'Connection.start_ok', | |
(10, 20): 'Connection.secure', | |
(10, 21): 'Connection.secure_ok', | |
(10, 30): 'Connection.tune', | |
(10, 31): 'Connection.tune_ok', | |
(10, 40): 'Connection.open', | |
(10, 41): 'Connection.open_ok', | |
(10, 50): 'Connection.redirect', | |
(10, 60): 'Connection.close', | |
(10, 61): 'Connection.close_ok', | |
(20, 10): 'Channel.open', | |
(20, 11): 'Channel.open_ok', | |
(20, 20): 'Channel.flow', | |
(20, 21): 'Channel.flow_ok', | |
(20, 30): 'Channel.alert', | |
(20, 40): 'Channel.close', | |
(20, 41): 'Channel.close_ok', | |
(30, 10): 'Channel.access_request', | |
(30, 11): 'Channel.access_request_ok', | |
(40, 10): 'Channel.exchange_declare', | |
(40, 11): 'Channel.exchange_declare_ok', | |
(40, 20): 'Channel.exchange_delete', | |
(40, 21): 'Channel.exchange_delete_ok', | |
(50, 10): 'Channel.queue_declare', | |
(50, 11): 'Channel.queue_declare_ok', | |
(50, 20): 'Channel.queue_bind', | |
(50, 21): 'Channel.queue_bind_ok', | |
(50, 30): 'Channel.queue_purge', | |
(50, 31): 'Channel.queue_purge_ok', | |
(50, 40): 'Channel.queue_delete', | |
(50, 41): 'Channel.queue_delete_ok', | |
(60, 10): 'Channel.basic_qos', | |
(60, 11): 'Channel.basic_qos_ok', | |
(60, 20): 'Channel.basic_consume', | |
(60, 21): 'Channel.basic_consume_ok', | |
(60, 30): 'Channel.basic_cancel', | |
(60, 31): 'Channel.basic_cancel_ok', | |
(60, 40): 'Channel.basic_publish', | |
(60, 50): 'Channel.basic_return', | |
(60, 60): 'Channel.basic_deliver', | |
(60, 70): 'Channel.basic_get', | |
(60, 71): 'Channel.basic_get_ok', | |
(60, 72): 'Channel.basic_get_empty', | |
(60, 80): 'Channel.basic_ack', | |
(60, 90): 'Channel.basic_reject', | |
(60, 100): 'Channel.basic_recover', | |
(90, 10): 'Channel.tx_select', | |
(90, 11): 'Channel.tx_select_ok', | |
(90, 20): 'Channel.tx_commit', | |
(90, 21): 'Channel.tx_commit_ok', | |
(90, 30): 'Channel.tx_rollback', | |
(90, 31): 'Channel.tx_rollback_ok', | |
} | |
def amqp_repr(data): | |
result = "AMQP: " | |
if len(data) == 8: | |
result += "Header {2}-{3}".format(*unpack('<4B', data[4:])) | |
return result | |
frame_type, channel, size = unpack('>BHI', data[:7]) | |
payload = data[7:7 + size] | |
method_sig = unpack('>HH', payload[:4]) | |
method_name = METHOD_NAME_MAP_0_8.get(method_sig) | |
result += "frame type: %s, channel: %s, method_sig: %s" % (frame_type, channel, method_name) | |
return result | |
#return repr(data) | |
if __name__ == '__main__': | |
server = Server() | |
server.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment