Skip to content

Instantly share code, notes, and snippets.

@lowell80
Last active October 12, 2018 01:13
Show Gist options
  • Save lowell80/93f31c6275a908ef94f048f8ad8149d2 to your computer and use it in GitHub Desktop.
Save lowell80/93f31c6275a908ef94f048f8ad8149d2 to your computer and use it in GitHub Desktop.
Repair checkpoint (.ckpt) files for various Splunk TAs
"""
Helps to recover corrupted .ckpt files created by Splunks 'splunksdc' module used by various TA's.
(I think it ships with the TA add onbuilder, but not 100% sure of that.)
Requires the 'umsgpack' 3rd party module
Usage:
python fix_checkpoint.py
More realistic usage:
pip install umsgpack
find $SPLUNK_HOME/var/lib/splunk/modinputs/ -name '*.ckpt' | xargs python fix_checkpoint.py
# Rename any reparied files by hand: (New file with .rebuild is created for any broken files)
cd $SPLUNK_HOME/var/lib/splunk/modinputs/<inputtype>/
mv MyInput.ckpt MyInput.ckpt.broken
mv MyInput.ckpt.rebuild MyInput.ckpt
Helps with exceptions like this:
2018-10-11 19:51:29,018 level=ERROR pid=30260 tid=MainThread logger=splunk_ta_o365.modinputs.management_activity pos=utils.py:wrapper:67 | datainput="Management_Activity_AzureActiveDirectory" start_time=1539287488 | message="Data input was interrupted by an unhandled exception."
Traceback (most recent call last):
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunksdc/utils.py", line 65, in wrapper
return func(*args, **kwargs)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunk_ta_o365/modinputs/management_activity.py", line 88, in run
with app.open_checkpoint(self.name) as checkpoint:
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunksdc/collector.py", line 258, in open_checkpoint
checkpoint = LocalKVStore.open_always(fullname)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunksdc/checkpoint.py", line 167, in open_always
indexes = cls.build_indexes(fp)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunksdc/checkpoint.py", line 174, in build_indexes
for flag, key, pos in cls._replay(fp):
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/splunksdc/checkpoint.py", line 103, in _replay
flag, key, _ = umsgpack.unpack(fp)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/3rdparty/umsgpack.py", line 879, in _unpack2
return _unpack(fp, options)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/3rdparty/umsgpack.py", line 833, in _unpack
return _unpack_dispatch_table[code](code, fp, options)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/3rdparty/umsgpack.py", line 785, in _unpack_array
return [_unpack(fp, options) for i in xrange(length)]
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/3rdparty/umsgpack.py", line 833, in _unpack
return _unpack_dispatch_table[code](code, fp, options)
File "/opt/splunk/etc/apps/splunk_ta_o365/bin/3rdparty/umsgpack.py", line 699, in _unpack_string
raise InvalidStringException("unpacked string is invalid utf-8")
InvalidStringException: unpacked string is invalid utf-8
Lowell Alleman <[email protected]>
Oct 11, 2018
Provided "AS IS". No warranties. This may be unfit for and or all purposes.
USE AT YOUR OWN RISK. Things may melt or explode.
"""
import sys
import umsgpack
def parse_msg(obj):
# Really really basic test to ensure that the data is a triple to filter out garbage
# that could have resulted from a corrupted record. Not perect, but this was
# 'good-enough' for my initial use case.
action, _, _ = obj
assert action in (0,1)
def test(checkpoint_file):
with open(checkpoint_file, "r+b") as fp:
# Skip magic block
magic = fp.read(4)
assert magic == "BUK0"
c = 0
for obj in umsgpack.Unpacker(fp, raw=False):
parse_msg(obj)
c += 1
# print obj
return c
def rebuild(checkpoint_file, checkpoint_new):
print("Recovering {} into new file {}".format(checkpoint_file, checkpoint_new))
with open(checkpoint_file, "r+b") as fp, open(checkpoint_new,"w+b") as np:
# Skip magic block
magic = fp.read(4)
assert magic == "BUK0"
np.write(magic)
good = 0
bad = 0
iterable = umsgpack.Unpacker(fp, raw=False)
while True:
try:
obj = next(iterable)
print(obj)
parse_msg(obj)
umsgpack.pack(obj, np)
good += 1
except StopIteration:
break
except Exception as e:
print("Exception: {}".format(e))
bad += 1
print("Recovered {} record, {} failed. Saved into {}".format(good, bad, checkpoint_new))
return good, bad
if __name__ == '__main__':
for filename in sys.argv[1:]:
try:
lines = test(filename)
print("Read {} with {} entries".format(filename, lines))
except (TypeError, umsgpack.UnpackValueError) as e:
print("Failed to process {} {}".format(filename, e))
rebuild(filename, filename + ".rebuild")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment