Skip to content

Instantly share code, notes, and snippets.

@progrium
Created October 26, 2010 02:42
Show Gist options
  • Select an option

  • Save progrium/646226 to your computer and use it in GitHub Desktop.

Select an option

Save progrium/646226 to your computer and use it in GitHub Desktop.
import collections
import time
import threading
import sys
import uuid
from mongrel2 import handler
import simplejson as json
import zmq
import zookeeper
NODE = int(sys.argv[1])
CTX = zmq.Context()
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s"
HTTP_CHUNK = "%(size)s\r\n%(body)s\r\n"
def http_response(body, code, status, headers):
payload = {'code': code, 'status': status, 'body': body}
payload['headers'] = "\r\n".join('%s: %s' % (k,v) for k,v in
headers.items())
return HTTP_FORMAT % payload
def http_chunk(body):
payload = {'body': body, 'size': hex(len(body))[2:]}
return HTTP_CHUNK % payload
def node_to_port(node):
return int(node) + 5000
class Cluster(threading.Thread):
CLUSTER_PATH = '/cluster'
def __init__(self, zookeeper_addr, node):
super(Cluster, self).__init__()
self.zookeeper_addr = zookeeper_addr
self.node = str(node)
self.members = set([self.node])
self.onmessage = lambda x,y: None
self.publisher = CTX.socket(zmq.PUB)
self.publisher.bind("tcp://127.0.0.1:%s" % node_to_port(node))
self.subscriber = CTX.socket(zmq.SUB)
self.subscriber.setsockopt(zmq.SUBSCRIBE, '/')
def update(self):
members = set(zookeeper.get_children(self.zk, self.CLUSTER_PATH, lambda h,t,s,p: self.update()))
for node in members-self.members:
if not node == self.node:
print "FOUND MEMBER: node%s" % node
self.subscriber.connect("tcp://127.0.0.1:%s" % node_to_port(node))
self.members = members
def send(self, path, message):
self.publisher.send(':'.join([path, message]))
def run(self):
def zookeeper_started(handler, type, state, path):
if not zookeeper.exists(handler, self.CLUSTER_PATH):
zookeeper.create(handler, self.CLUSTER_PATH, "", [ZOO_OPEN_ACL_UNSAFE])
try:
zookeeper.create(handler, '/'.join([self.CLUSTER_PATH, self.node]), "",
[ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL)
except zookeeper.NodeExistsException:
pass
self.update()
self.zk = zookeeper.init(self.zookeeper_addr, zookeeper_started)
while True:
if len(self.members) > 1:
try:
payload = self.subscriber.recv(zmq.NOBLOCK)
print "GOT: %s" % payload
path, message = payload.split(':', 1)
self.onmessage(path, message)
except zmq.ZMQError:
pass
time.sleep(0.2)
mongrel = handler.Connection(str(uuid.uuid4()),
"ipc://run/node%i-requests" % NODE, "ipc://run/node%i-responses" % NODE)
listeners = collections.defaultdict(list)
cluster = Cluster('127.0.0.1:2181', NODE)
cluster.onmessage = \
lambda path, message: mongrel.deliver('mongrel-frontend', listeners[path], http_chunk(message+"\n"))
cluster.start()
time.sleep(1)
print "Listening for requests as node%i..." % NODE
while True:
req = mongrel.recv()
if req.is_disconnect():
print "DISCONNECT %s" % req.path
continue
if req.headers['METHOD'] == 'GET':
print "listener on %s" % req.path
listeners[req.path].append(req.conn_id)
mongrel.reply(req, http_response(http_chunk("\n"), 200, "OK",
{'connection': 'keep-alive', 'transfer-encoding': 'chunked', 'content-type': 'application/json'}))
continue
if req.headers['METHOD'] == 'POST':
print "publishing '%s' on %s" % (req.body, req.path)
mongrel.deliver('mongrel-frontend', listeners[req.path], http_chunk(req.body+"\n"))
cluster.send(req.path, req.body)
mongrel.reply_http(req, "Sent\n", 200, "OK")
continue
@progrium

Copy link
Copy Markdown
Author

I'd probably do a lot of things different using Eventlet...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment