Created
October 26, 2010 02:42
-
-
Save progrium/646226 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import collections | |
| import time | |
| import threading | |
| import sys | |
| import uuid | |
| from mongrel2 import handler | |
| import simplejson as json | |
| import zmq | |
| import zookeeper | |
| NODE = int(sys.argv[1]) | |
| CTX = zmq.Context() | |
| ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"} | |
| HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s" | |
| HTTP_CHUNK = "%(size)s\r\n%(body)s\r\n" | |
| def http_response(body, code, status, headers): | |
| payload = {'code': code, 'status': status, 'body': body} | |
| payload['headers'] = "\r\n".join('%s: %s' % (k,v) for k,v in | |
| headers.items()) | |
| return HTTP_FORMAT % payload | |
| def http_chunk(body): | |
| payload = {'body': body, 'size': hex(len(body))[2:]} | |
| return HTTP_CHUNK % payload | |
| def node_to_port(node): | |
| return int(node) + 5000 | |
| class Cluster(threading.Thread): | |
| CLUSTER_PATH = '/cluster' | |
| def __init__(self, zookeeper_addr, node): | |
| super(Cluster, self).__init__() | |
| self.zookeeper_addr = zookeeper_addr | |
| self.node = str(node) | |
| self.members = set([self.node]) | |
| self.onmessage = lambda x,y: None | |
| self.publisher = CTX.socket(zmq.PUB) | |
| self.publisher.bind("tcp://127.0.0.1:%s" % node_to_port(node)) | |
| self.subscriber = CTX.socket(zmq.SUB) | |
| self.subscriber.setsockopt(zmq.SUBSCRIBE, '/') | |
| def update(self): | |
| members = set(zookeeper.get_children(self.zk, self.CLUSTER_PATH, lambda h,t,s,p: self.update())) | |
| for node in members-self.members: | |
| if not node == self.node: | |
| print "FOUND MEMBER: node%s" % node | |
| self.subscriber.connect("tcp://127.0.0.1:%s" % node_to_port(node)) | |
| self.members = members | |
| def send(self, path, message): | |
| self.publisher.send(':'.join([path, message])) | |
| def run(self): | |
| def zookeeper_started(handler, type, state, path): | |
| if not zookeeper.exists(handler, self.CLUSTER_PATH): | |
| zookeeper.create(handler, self.CLUSTER_PATH, "", [ZOO_OPEN_ACL_UNSAFE]) | |
| try: | |
| zookeeper.create(handler, '/'.join([self.CLUSTER_PATH, self.node]), "", | |
| [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL) | |
| except zookeeper.NodeExistsException: | |
| pass | |
| self.update() | |
| self.zk = zookeeper.init(self.zookeeper_addr, zookeeper_started) | |
| while True: | |
| if len(self.members) > 1: | |
| try: | |
| payload = self.subscriber.recv(zmq.NOBLOCK) | |
| print "GOT: %s" % payload | |
| path, message = payload.split(':', 1) | |
| self.onmessage(path, message) | |
| except zmq.ZMQError: | |
| pass | |
| time.sleep(0.2) | |
| mongrel = handler.Connection(str(uuid.uuid4()), | |
| "ipc://run/node%i-requests" % NODE, "ipc://run/node%i-responses" % NODE) | |
| listeners = collections.defaultdict(list) | |
| cluster = Cluster('127.0.0.1:2181', NODE) | |
| cluster.onmessage = \ | |
| lambda path, message: mongrel.deliver('mongrel-frontend', listeners[path], http_chunk(message+"\n")) | |
| cluster.start() | |
| time.sleep(1) | |
| print "Listening for requests as node%i..." % NODE | |
| while True: | |
| req = mongrel.recv() | |
| if req.is_disconnect(): | |
| print "DISCONNECT %s" % req.path | |
| continue | |
| if req.headers['METHOD'] == 'GET': | |
| print "listener on %s" % req.path | |
| listeners[req.path].append(req.conn_id) | |
| mongrel.reply(req, http_response(http_chunk("\n"), 200, "OK", | |
| {'connection': 'keep-alive', 'transfer-encoding': 'chunked', 'content-type': 'application/json'})) | |
| continue | |
| if req.headers['METHOD'] == 'POST': | |
| print "publishing '%s' on %s" % (req.body, req.path) | |
| mongrel.deliver('mongrel-frontend', listeners[req.path], http_chunk(req.body+"\n")) | |
| cluster.send(req.path, req.body) | |
| mongrel.reply_http(req, "Sent\n", 200, "OK") | |
| continue |
Author
I'd probably do a lot of things different using Eventlet...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It looks like it's polling using non-blocking IO. Jeff thought that this was necessary otherwise the ZooKeeper code would be blocked. However, we don't need to poll. We shouldn't be using non-blocking IO since we're using threads. Furthermore, we no longer need to catch ZMQError since that was being raised when the non-blocking IO wasn't able to return anything.