Skip to content

Instantly share code, notes, and snippets.

@lazyfrosch
Last active December 15, 2018 11:41
Show Gist options
  • Save lazyfrosch/f05b5cbfe8a05d4a1cee93db2c0028c0 to your computer and use it in GitHub Desktop.
Save lazyfrosch/f05b5cbfe8a05d4a1cee93db2c0028c0 to your computer and use it in GitHub Desktop.
Convert the icinga2.state file to readable JSON
$ sudo cat /var/lib/icinga2/icinga2.state | ./icinga2-debug-state.py
[
{
"name": "api",
"type": "ApiListener",
"update": {
"log_message_timestamp": 1522659331.677504,
"type": "ApiListener",
"version": 0.0
}
},
{
"name": "deradmin",
"type": "ApiUser",
"update": {
"type": "ApiUser",
"version": 0.0
}
},
{
"name": "ido",
"type": "CheckCommand",
"update": {
"type": "CheckCommand",
"version": 0.0
}
},
...
#!/usr/bin/env python
import sys, json
def main():
stream = None
filename = None
if len(sys.argv) > 2:
raise Exception, 'Only one (optional) argument allowed'
elif len(sys.argv) == 2:
filename = sys.argv[1]
if filename and filename != '-':
stream = open(filename, 'r')
else:
stream = sys.stdin
data = stream.read()
states = []
for element in decode(data)[0]:
jd = json.loads(element)
states.append(jd)
print json.dumps(states, sort_keys=True, indent=4, separators=(',', ': '))
sys.exit(0)
"""
Netstring is a module for encoding and decoding netstring streams.
See http://cr.yp.to/proto/netstrings.txt for more information on netstrings.
Author: Will McGugan (http://www.willmcgugan.com)
"""
__version__ = "1.0.0"
try:
import cStringIO as StringIO
except ImportError:
# For platforms that don't have cStringIO
import StringIO
def header(data):
"""Returns the netstring header for a given string.
data -- A string you want to produce a header for.
"""
return str(len(data))+":"
def encode(data):
"""Encodes a netstring. Returns the data encoded as a netstring.
data -- A string you want to encode as a netstring.
"""
if not isinstance(data, str):
raise ValueError("data should be of type 'str'")
return "%i:%s," % (len(data), data)
class FileEncoder(object):
"""Object that writes netstrings to a file."""
def __init__(self, file_out):
""""
file_out -- A writable file object
"""
self.file_out = file_out
def write(self, data):
"""Encodes a netstring and writes it to the file object.
data -- A string to be encoded and written
"""
if not isinstance(data, str):
raise ValueError("data should be of type 'str'")
write = self.file_out.write
write(header(data))
write(data)
write(',')
return self
def netstrings_to_file(file_out, data_container):
"""Writes a container of netstrings to a file.
file_out -- A writeable file-like object
data_container -- An iterable of strings
"""
write = file_out.write
for s in data_container:
if not isinstance(s, str):
raise ValueError("data should be of type 'str'")
write(header(s))
write(s)
write(',')
def encode_netstrings(data_container):
"""Encodes a number of strings as sequence of netstrings.
data_container -- An iterable of strings to be encoded
"""
return "".join(encode(s) for s in data_container)
class DecoderError(Exception):
(
PRECEDING_ZERO_IN_SIZE,
MAX_SIZE_REACHED,
ILLEGAL_DIGIT_IN_SIZE,
ILLEGAL_DIGIT
) = range(4)
error_text =\
{
PRECEDING_ZERO_IN_SIZE:"PRECEDING_ZERO_IN_SIZE",
MAX_SIZE_REACHED:"MAX_SIZE_REACHED",
ILLEGAL_DIGIT_IN_SIZE:"ILLEGAL_DIGIT_IN_SIZE",
ILLEGAL_DIGIT:"ILLEGAL_DIGIT"
}
def __init__(self, code, text):
Exception.__init__(self)
self.code = code
self.text = text
def __str__(self):
return "%s (#%i), %s" % (DecoderError.error_text[self.code], self.code, self.text)
class Decoder(object):
"""A netstring decoder.
Turns a netstring stream in to a number of discreet strings.
"""
def __init__(self, max_size=None):
"""Create a netstring-stream decoder object.
max_size -- The maximum size of a netstring encoded string, after which
a DecoderError will be throw. A value of None (the default) indicates
that there should be no maximum string size.
"""
self.max_size = max_size
self.data_pos = 0
self.string_start = 0
self.expecting_terminator = False
self.size_string = ""
self.data_size = None
self.remaining_bytes = 0
self.data_out = StringIO.StringIO()
self.yield_data = ""
def __str__(self):
if self.data_size is None:
bytes = len(self.size_string)
else:
bytes = self.data_out.tell()
return "<netstring decoder, %i bytes in buffer>"%bytes
def peek_buffer(self):
"""Returns any bytes not used by decoder."""
return self.data_out.getvalue()
def reset(self):
"""Resets decoder to initial state, and discards any cached stream data."""
self.data_pos = 0
self.string_start = 0
self.expecting_terminator = False
self.size_string = ""
self.data_size = None
self.remaining_bytes = 0
self.yield_data = ""
self.data_out.reset()
self.data_out.truncate()
def feed(self, data):
"""A generator that yields 0 or more strings from the given data.
data -- A string containing complete or partial netstring data
"""
if not isinstance(data, str):
raise ValueError("data should be of type 'str'")
self.data_pos = 0
self.string_start = 0
while self.data_pos < len(data):
if self.expecting_terminator:
c = data[self.data_pos]
self.data_pos += 1
if c != ',':
raise DecoderError(DecoderError.ILLEGAL_DIGIT, "Illegal digit (%s) at end of data"%repr(c))
yield self.yield_data
self.yield_data = ""
self.expecting_terminator = False
elif self.data_size is None:
c = data[self.data_pos]
self.data_pos += 1
if not len(self.size_string):
self.string_start = self.data_pos-1
if c in "0123456789":
if self.size_string == '0':
raise DecoderError(DecoderError.PRECEDING_ZERO_IN_SIZE, "Preceding zeros in size field illegal")
self.size_string += c
if self.max_size is not None and int(self.size_string) > self.max_size:
raise DecoderError(DecoderError.MAX_SIZE_REACHED, "Maximum size of netstring exceeded")
elif c == ":":
if not len(self.size_string):
raise DecoderError(DecoderError.ILLEGAL_DIGIT_IN_SIZE, "Illegal digit (%s) in size field"%repr(c))
self.data_size = int(self.size_string)
self.remaining_bytes = self.data_size
else:
raise DecoderError(DecoderError.ILLEGAL_DIGIT_IN_SIZE, "Illegal digit (%s) in size field"%repr(c))
elif self.data_size is not None:
get_bytes = min(self.remaining_bytes, len(data)-self.data_pos)
chunk = data[self.data_pos:self.data_pos+get_bytes]
whole_string = len(chunk) == self.data_size
if not whole_string:
self.data_out.write(chunk)
self.data_pos += get_bytes
self.remaining_bytes -= get_bytes
if self.remaining_bytes == 0:
if whole_string:
self.yield_data = chunk
else:
self.yield_data = self.data_out.getvalue()
self.data_out.reset()
self.data_out.truncate()
self.data_size = None
self.size_string = ""
self.remaining_bytes = 0
self.expecting_terminator = True
def decode(data):
"""Decodes netstrings and returns a tuple containing a
list of strings, and any remaining data.
data -- A string containing netstring data
"""
decoder = Decoder()
netstrings = list(decoder.feed(data))
remaining = data[decoder.string_start:]
return netstrings, remaining
def decode_file(file_in, buffer_size=1024):
"""Generates 0 or more strings from a netstring file.
file_in -- A readable file-like object containing netstring data
buffer_size -- The number of bytes to attempt to read in each iteration (default = 1024).
"""
decoder = Decoder()
while True:
data = file_in.read(buffer_size)
if not len(data):
return
for s in decoder.feed(data):
yield s
if __name__ == '__testing__':
import unittest
class TestNetstring(unittest.TestCase):
def setUp(self):
self.test_data = "Netstring module by Will McGugan"
self.encoded_data = "9:Netstring,6:module,2:by,4:Will,7:McGugan,"
def test_header(self):
tests = [ ("netstring", "9:"),
("Will McGugan", "12:"),
("", "0:") ]
for test, result in tests:
self.assertEqual(header(test), result)
def test_encode(self):
tests = [ ("netstring", "9:netstring,"),
("Will McGugan", "12:Will McGugan,"),
("", "0:,") ]
for test, result in tests:
self.assertEqual(encode(test), result)
def test_file_encoder(self):
file_out = StringIO.StringIO()
data = self.test_data.split()
encoder = FileEncoder(file_out)
for s in data:
encoder.write(s)
encoded_data = file_out.getvalue()
self.assertEqual(encoded_data, self.encoded_data)
def test_decode_file(self):
data = self.test_data.split()
for buffer_size in range(1, len(self.encoded_data)):
file_in = StringIO.StringIO(self.encoded_data[:])
decoded_data = list(decode_file(file_in, buffer_size = buffer_size))
self.assertEqual(decoded_data, data)
def test_decoder(self):
encoded_data = self.encoded_data
for step in range(1, len(encoded_data)):
i = 0
chunks = []
while i < len(encoded_data):
chunks.append(encoded_data[i:i+step])
i += step
decoder = Decoder()
decoded_data = []
for chunk in chunks:
for s in decoder.feed(chunk):
decoded_data.append(s)
self.assertEqual(decoded_data, self.test_data.split())
unittest.main()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment