Skip to content

Instantly share code, notes, and snippets.

@progrium
Created October 26, 2010 02:42
Show Gist options
  • Select an option

  • Save progrium/646226 to your computer and use it in GitHub Desktop.

Select an option

Save progrium/646226 to your computer and use it in GitHub Desktop.
import collections
import time
import threading
import sys
import uuid
from mongrel2 import handler
import simplejson as json
import zmq
import zookeeper
NODE = int(sys.argv[1])
CTX = zmq.Context()
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s"
HTTP_CHUNK = "%(size)s\r\n%(body)s\r\n"
def http_response(body, code, status, headers):
payload = {'code': code, 'status': status, 'body': body}
payload['headers'] = "\r\n".join('%s: %s' % (k,v) for k,v in
headers.items())
return HTTP_FORMAT % payload
def http_chunk(body):
payload = {'body': body, 'size': hex(len(body))[2:]}
return HTTP_CHUNK % payload
def node_to_port(node):
return int(node) + 5000
class Cluster(threading.Thread):
CLUSTER_PATH = '/cluster'
def __init__(self, zookeeper_addr, node):
super(Cluster, self).__init__()
self.zookeeper_addr = zookeeper_addr
self.node = str(node)
self.members = set([self.node])
self.onmessage = lambda x,y: None
self.publisher = CTX.socket(zmq.PUB)
self.publisher.bind("tcp://127.0.0.1:%s" % node_to_port(node))
self.subscriber = CTX.socket(zmq.SUB)
self.subscriber.setsockopt(zmq.SUBSCRIBE, '/')
def update(self):
members = set(zookeeper.get_children(self.zk, self.CLUSTER_PATH, lambda h,t,s,p: self.update()))
for node in members-self.members:
if not node == self.node:
print "FOUND MEMBER: node%s" % node
self.subscriber.connect("tcp://127.0.0.1:%s" % node_to_port(node))
self.members = members
def send(self, path, message):
self.publisher.send(':'.join([path, message]))
def run(self):
def zookeeper_started(handler, type, state, path):
if not zookeeper.exists(handler, self.CLUSTER_PATH):
zookeeper.create(handler, self.CLUSTER_PATH, "", [ZOO_OPEN_ACL_UNSAFE])
try:
zookeeper.create(handler, '/'.join([self.CLUSTER_PATH, self.node]), "",
[ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL)
except zookeeper.NodeExistsException:
pass
self.update()
self.zk = zookeeper.init(self.zookeeper_addr, zookeeper_started)
while True:
if len(self.members) > 1:
try:
payload = self.subscriber.recv(zmq.NOBLOCK)
print "GOT: %s" % payload
path, message = payload.split(':', 1)
self.onmessage(path, message)
except zmq.ZMQError:
pass
time.sleep(0.2)
mongrel = handler.Connection(str(uuid.uuid4()),
"ipc://run/node%i-requests" % NODE, "ipc://run/node%i-responses" % NODE)
listeners = collections.defaultdict(list)
cluster = Cluster('127.0.0.1:2181', NODE)
cluster.onmessage = \
lambda path, message: mongrel.deliver('mongrel-frontend', listeners[path], http_chunk(message+"\n"))
cluster.start()
time.sleep(1)
print "Listening for requests as node%i..." % NODE
while True:
req = mongrel.recv()
if req.is_disconnect():
print "DISCONNECT %s" % req.path
continue
if req.headers['METHOD'] == 'GET':
print "listener on %s" % req.path
listeners[req.path].append(req.conn_id)
mongrel.reply(req, http_response(http_chunk("\n"), 200, "OK",
{'connection': 'keep-alive', 'transfer-encoding': 'chunked', 'content-type': 'application/json'}))
continue
if req.headers['METHOD'] == 'POST':
print "publishing '%s' on %s" % (req.body, req.path)
mongrel.deliver('mongrel-frontend', listeners[req.path], http_chunk(req.body+"\n"))
cluster.send(req.path, req.body)
mongrel.reply_http(req, "Sent\n", 200, "OK")
continue
@jjinux

jjinux commented Jan 19, 2011

Copy link
Copy Markdown

It looks like it's polling using non-blocking IO. Jeff thought that this was necessary otherwise the ZooKeeper code would be blocked. However, we don't need to poll. We shouldn't be using non-blocking IO since we're using threads. Furthermore, we no longer need to catch ZMQError since that was being raised when the non-blocking IO wasn't able to return anything.

@progrium

Copy link
Copy Markdown
Author

I'd probably do a lot of things different using Eventlet...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment