Created
September 8, 2011 12:55
-
-
Save lukemarsden/1203328 to your computer and use it in GitHub Desktop.
Twisted Spread integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Released under BSD 2-clause license by Luke Marsden | |
from twisted.internet import reactor | |
from twisted.application import service | |
from twisted.python import log | |
import spread | |
import zlib | |
def hex2ip(sender): | |
_,hex,priv = sender.split('#') | |
quads = [hex[0:2],hex[2:4],hex[4:6],hex[6:8]] | |
ip = '.'.join(map(lambda x: str(int(x,16)),quads)) | |
return ip | |
class Spreader(service.Service): | |
mailbox = None | |
privateGroup = None | |
connected = False | |
mboxfd = None | |
def __init__(self, config): | |
self.spreadName = "%s@%s"%(config['spread_port'], config['spread_host']) | |
self.privateName = str(config['my_ip']) | |
self.priority = 0 | |
self.membership = int(config['spread_membership']) | |
self.group = config['spread_group'] | |
def startService(self): | |
service.Service.startService(self) | |
self.connect() | |
self.checkConnection() | |
def checkConnection(self): | |
""" | |
Checks whether we are still connected to Spread (sometimes Spread kicks | |
us off if we can't receive fast enough. If this happens, reconnect | |
after a couple of seconds. | |
""" | |
if not self.connected: | |
self.connect() | |
reactor.callLater(1, self.checkConnection) | |
def stopService(self): | |
service.Service.stopService(self) | |
self.disconnect() | |
def regularMessageReceived(self, message): | |
#print inspect.getmembers(message) | |
try: | |
self.messageReceived(message.sender, message.groups, message.message, | |
message.msg_type, message.endian) | |
except: | |
log.err(None, "Problem dispatching regular spread message") | |
def membershipMessageReceived(self, message): | |
#print inspect.getmembers(message) | |
if message.reason == 0: | |
self.groupTransitioned(message.group) | |
elif message.reason == spread.CAUSED_BY_JOIN: | |
self.groupJoined(message.group, message.members) | |
self.groupChanged(message.group, message.members, message.orig_members) | |
elif (message.reason == spread.CAUSED_BY_LEAVE | |
or message.reason == spread.CAUSED_BY_DISCONNECT): | |
self.groupLeft(message.group, message.members) | |
self.groupChanged(message.group, message.members, message.orig_members) | |
else: | |
print "Unknown membership message:", str(message) | |
def messageReceived(self, sender, groups, message, msg_type, extra): | |
#log.debug("messageReceived: %s %s %s", sender, groups, message) | |
pass | |
def groupTransitioned(self, group): | |
#log.debug("groupTransitioned: %s", group) | |
pass | |
def groupJoined(self, group, who): | |
#log.debug("groupJoined: %s %s", group, who) | |
pass | |
def groupLeft(self, group, who): | |
#log.debug("groupLeft: %s %s", group, who) | |
pass | |
def groupChanged(self, group, current, past): | |
#log.debug("groupChanged: %s %s", group, current) | |
pass | |
def connectionLost(s,reason): | |
print reason | |
s.disconnect() | |
def connect(self, tries = 10): | |
print "Spreader: connecting to spread" | |
try: | |
self.mailbox = spread.connect(self.spreadName, self.privateName, | |
self.priority, self.membership) | |
except Exception, e: # Sometimes we reconnect too soon, in which case Spread still thinks we're connected - so just disconnect, and we'll try again | |
print "**", e | |
self.disconnect() | |
return | |
self.privateGroup = self.mailbox.private_group | |
self.join(self.group) | |
class MailboxFD: | |
def __init__(s,mailbox): | |
s.mailbox = mailbox | |
def fileno(s): | |
if self.connected: | |
try: | |
mboxno = s.mailbox.fileno() | |
if type(mboxno) == int: | |
return mboxno | |
else: | |
self.disconnect() | |
print '** Mailbox is not an integer - disconnected!' | |
except Exception, e: | |
self.disconnect() | |
print "** !! ", e | |
else: | |
# Don't even bother trying | |
print "** Refused to try and get file descriptor when we're not connected. Told Twisted to stop listening." | |
self.disconnect() | |
def doRead(s): | |
try: | |
m = s.mailbox.receive() | |
if isinstance(m, spread.RegularMsgType): | |
self.regularMessageReceived(m) # self refers to the object in the outer lexical scope | |
elif isinstance(m, spread.MembershipMsgType): | |
self.membershipMessageReceived(m) | |
except spread.error, e: # Spread might chuck us off because we haven't read enough | |
print "CRITICAL Exception in spreader: ", str(e) | |
self.disconnect() | |
raise e | |
def connectionLost(s,reason): # is this called? | |
print "CRITICAL Spreader Connection Lost: "+str(reason) | |
self.disconnect() | |
def logPrefix(s): | |
return 'Spread Mailbox' | |
self.mboxfd = MailboxFD(self.mailbox) | |
reactor.addReader(self.mboxfd) | |
self.connected = True | |
def disconnect(self): | |
print "Spreader: disconnecting from spread" | |
self.connected = False | |
if self.mailbox: | |
self.mailbox.disconnect() | |
self.mailbox = None | |
self.privateGroup = None | |
if self.mboxfd: | |
reactor.removeReader(self.mboxfd) | |
def join(self, group): | |
print "Spreader: joining mailbox" | |
if not self.mailbox: | |
raise Exception("no mailbox") | |
self.mailbox.join(group) | |
def leave(self, group): | |
print "Spreader: leaving mailbox" | |
if not self.mailbox: | |
raise Exception("no mailbox") | |
self.mailbox.leave(group) | |
def multicast(self, message, service_type=spread.SAFE_MESS, group='yourgroup', message_type=0): | |
assert isinstance(message, basestring), ( # TODO: Maybe we don't want unicode in here? | |
"Message passed to spreader multicast is not a string, it was a %r" % (type(message),)) | |
if self.connected: | |
if not self.mailbox: | |
raise Exception("no mailbox") | |
try: | |
preCompressionLength = len(message) | |
startOfMessage = message[:100] | |
message = zlib.compress(message) | |
postCompressionLength = len(message) | |
if postCompressionLength > 10 * 1000: # 10kb, Spread message limit is 130kb | |
print ("large_messages", | |
"After compression: %i bytes (before %i), start of message: %s" % | |
(postCompressionLength, preCompressionLength, startOfMessage)) | |
if postCompressionLength > 100 * 1000: # 100kb, Spread gets unhappy at 130kb | |
print "CRITICAL Dropping message > 100kb to avoid making Spread unhappy:", | |
print startOfMessage | |
else: | |
return self.mailbox.multicast(service_type, group, message, message_type) | |
except Exception, e: | |
self.disconnect() | |
print "**", e | |
else: | |
print "** Tried to send message when not connected. The following message has been lost:" | |
print message[:100] | |
def multigroup_multicast(self, *a, **kw): | |
if not self.mailbox: | |
raise Exception("no mailbox") | |
return self.mailbox.multigroup_multicast(*a, **kw) | |
class InteractingSpreader(Spreader): | |
def __init__(self, sm, my_ip, spread_group=False, group_notifier=False): | |
self.sm = sm | |
self.group_notifier = group_notifier | |
# convert my_ip to hex | |
hex = ''.join(map(lambda x: "%02X" % int(x),my_ip.split('.'))) | |
optParameters = { | |
'spread_host': 'localhost', | |
'spread_port': str(spread.DEFAULT_SPREAD_PORT), | |
'spread_membership': '1', | |
'spread_group': spread_group or 'yourgroup', | |
'my_ip' : hex | |
} | |
Spreader.__init__(self, optParameters) | |
def messageReceived(self, sender, groups, message, msg_type, extra): | |
self.sm.parser.handle_xml(zlib.decompress(message), hex2ip(sender)) | |
def groupTransitioned(self, group): | |
if self.group_notifier: | |
self.group_notifier.groupTransitioned(group) | |
def groupJoined(self, group, who): | |
if self.group_notifier: | |
self.group_notifier.groupJoined(group,who) | |
def groupLeft(self, group, who): | |
if self.group_notifier: | |
self.group_notifier.groupLeft(group,who) | |
def groupChanged(self, group, current, past): | |
if self.group_notifier: | |
self.group_notifier.groupChanged(group,current,past) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment