Created
June 25, 2011 20:16
-
-
Save lukemarsden/1046854 to your computer and use it in GitHub Desktop.
MuninClientProtocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
from twisted.internet import reactor | |
from twisted.internet import protocol | |
from twisted.internet import defer | |
class MuninClientProtocol(protocol.Protocol): | |
def __init__(self, out_channel, server): | |
print "Constructing protocol" | |
self.buf = '' | |
self.on_dataReceived = defer.Deferred() | |
self.start_time = time.time() | |
# out_channel is an object with two methods: | |
# | |
# send_result(probe, data): | |
# - a function to call with the result of each probe | |
# finish(reason): | |
# - a function to call when we're finished, reason can be a | |
# Failure object or string | |
# | |
# server is just the name of the server we are querying | |
self.out_channel = out_channel | |
self.server = server | |
# got_probes is used for the meta-measurement munin.success | |
# if we got as far as receiving the list of probes, | |
# we consider this box UP (for use in the UI). | |
self.got_probes = False | |
# Timeout in 30 seconds if we haven't got a complete response | |
self.timeout_dfr = reactor.callLater(30, self._end_connection, 'Timeout') | |
def connectionMade(self): | |
self.collect_data() | |
protocol.Protocol.connectionMade(self) | |
def connectionFailed(self, reason): | |
self._end_connection(reason) | |
protocol.Protocol.connectionFailed(self, reason) | |
def connectionLost(self, reason): | |
self._end_connection(reason) | |
protocol.Protocol.connectionLost(self, reason) | |
def _end_connection(self, reason): | |
print "In end_connection %s" % self.server | |
if self.got_probes: | |
self._process('munin.success', 'value.value 1') | |
self._process('munin.fetch_time', 'value.value ' + str(time.time() - self.start_time)) | |
else: | |
self._process('munin.success', 'value.value 0') | |
self.out_channel.finish(reason) | |
if self.timeout_dfr and not self.timeout_dfr.called: | |
self.timeout_dfr.cancel() | |
self.timeout_dfr = None # garbage-collect this | |
@defer.inlineCallbacks | |
def recv(self): | |
"Receive a packet" | |
data = yield self.on_dataReceived | |
self.on_dataReceived = defer.Deferred() # reset deferred | |
defer.returnValue(data) | |
def dataReceived(self, data): | |
#print "Got %s\n" % data | |
self.on_dataReceived.callback(data) | |
@defer.inlineCallbacks | |
def collect_data(self): | |
"Iterate over the server's probes, fetching each one" | |
yield self.recv() # throw away server's greeting (*) | |
self.transport.write("list\n") | |
probes = yield self.recv() # (*) assuming these come in a single packet | |
self.got_probes = True | |
print probes | |
for probe in probes.split(' '): | |
#print "Writing fetch %s\n" % probe | |
self.transport.write("fetch %s\n" % probe) | |
data_buf = '' | |
while not (data_buf==".\n" or data_buf.endswith("\n.\n")): | |
result = yield self.recv() | |
data_buf += result | |
self._process(probe, data_buf) | |
self.transport.loseConnection() | |
def _process(self, probe, data_buf): | |
"Process a result from a single probe" | |
result = [] | |
for line in data_buf.split('\n'): | |
# Some plugins send a retarded double-space separator, so we use a regex: | |
matches = re.match("([^ ]+)[ ]+([^ ]+)", line) | |
if matches: | |
lhs, rhs = matches.groups() | |
result.append((lhs.replace('.value',''),rhs.replace('\n',''))) | |
# Push the structured data out the output channel | |
self.out_channel.send_result(self.server, probe, result) | |
class MuninClientFactory(protocol.ClientFactory): | |
def __init__(self, out_channel, server): | |
self.server = server | |
self.out_channel = out_channel | |
def clientConnectionFailed(self, connector, reason): | |
# Record that we were unable to connect to munin | |
self.out_channel.send_result(self.server, 'munin.success', [('value', 0)]) # Record that this machine was down at this moment | |
self.out_channel.finish('Connection failed') | |
protocol.ClientFactory.clientConnectionFailed(self, connector, reason) | |
def clientConnectionLost(self, connector, reason): | |
# This is legitimate, don't fail the connection on a clean connection closed | |
protocol.ClientFactory.clientConnectionLost(self, connector, reason) | |
protocol = MuninClientProtocol | |
def buildProtocol(self, addr): | |
p = self.protocol(self.out_channel, self.server) | |
p.factory = self | |
return p |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment