Created
July 25, 2019 20:50
-
-
Save offlinemark/37334234b5c43fe58dbbdbe87a7a7f44 to your computer and use it in GitHub Desktop.
find osquery tables which advertise an index but do not implement
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import subprocess as sp | |
from collections import defaultdict | |
from enum import Enum, auto | |
class CheckStatus(Enum): | |
FAIL = auto() | |
PASS = auto() | |
MAYBE = auto() | |
# search for index=True in specs | |
def find_declared_indices(): | |
cmd = 'grep -B1 -r index=True specs'.split() | |
proc = sp.run(cmd, stdout=sp.PIPE) | |
# print(proc.stdout) | |
ret = parse_grep_output(proc.stdout.decode()) | |
# for a, b in ret.items(): | |
# print(a, b) | |
return ret | |
# table -> [col, col, col] | |
def parse_grep_output(output): | |
ret = defaultdict(list) | |
# assumes that is Column is not on the line with index=True, | |
# it will be on the line previous | |
file_line_chunks = output.split('--') | |
for chunk in file_line_chunks: | |
# sometimes same file is in diff chunks... domain_groups | |
lines = chunk.strip().splitlines() | |
table_spec_path = lines[0].split('-')[0] | |
table_name = table_spec_path.split('/')[-1].split('.table')[0] | |
# print(table_spec_path, table_name) | |
for i, line in enumerate(lines): | |
# find lines with table: | |
# check if it has Column in the line, and is not commented out | |
# if it has column, parse out the column name | |
# if it doesn't, get the previous line and parse out the column | |
# name | |
# print(line) | |
if 'table:' in line: | |
col_name = '' | |
has_comment = line.split('table:')[1].strip().startswith('#') | |
has_column = 'Column' in line | |
if not has_comment and has_column: | |
col_name = line.split('"')[1] | |
elif has_comment and has_column: | |
# skip | |
continue | |
else: | |
prev_line = lines[i-1] | |
if 'Column' not in prev_line or prev_line.strip().startswith('#'): | |
raise Exception('prev line was also commented or did not have Column') | |
col_name = prev_line.split('"')[1] | |
assert col_name | |
ret[table_name].append(col_name) | |
return ret | |
def col_check(table_name, col): | |
# the second pipe to grep may increase the false positive rate in case | |
# the constraint was actually checked, but in a different file that | |
cmd = f'grep -F -r \'constraints["{col}"]\' osquery | grep {table_name}' | |
grepproc = sp.run(cmd, stdout=sp.PIPE, shell=True) | |
strict_found = grepproc.returncode == 0 | |
# may miss some, will not report false positive | |
cmd = f'grep -F -r \'constraints["{col}"]\' osquery' | |
# print(table_name, col) | |
proc = sp.run(cmd, stdout=sp.PIPE, shell=True) | |
# print(proc.stdout.decode()) | |
# return proc.returncode == 0 | |
loose_found = proc.returncode == 0 | |
if not strict_found and loose_found: | |
# we found something in the general grep, but not for the specific table | |
return CheckStatus.MAYBE | |
if loose_found: | |
return CheckStatus.PASS | |
else: | |
return CheckStatus.FAIL | |
def check_indices(ind): | |
for table_name, cols in ind.items(): | |
for col in cols: | |
stat = col_check(table_name, col) | |
# if stat in (CheckStatus.FAIL, CheckStatus.MAYBE): | |
if stat in (CheckStatus.FAIL,): | |
print('warning:', stat, table_name, col) | |
# if not col_check(table_name, col): | |
# print('check failed!', table_name, col) | |
def main(): | |
ind = find_declared_indices() | |
check_indices(ind) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment