Created
September 25, 2021 17:19
-
-
Save gvanem/1184e11fab3fea4511122094fa7e1f3a to your computer and use it in GitHub Desktop.
RTL_433 test script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Compare actual output Json-formatted output lines of | |
rtl_433 with reference json.""" | |
import sys | |
import os | |
import argparse | |
import fnmatch | |
import subprocess | |
import json | |
import inspect | |
from deepdiff import DeepDiff | |
from pprint import pprint | |
os.putenv ("WSOCK_TRACE_LEVEL", "0") | |
try: | |
import colorama | |
colorama.init() | |
have_colorama = True | |
class colour: | |
GREEN = colorama.Fore.GREEN + colorama.Style.BRIGHT | |
YELLOW = colorama.Fore.YELLOW + colorama.Style.BRIGHT | |
RED = colorama.Fore.RED + colorama.Style.BRIGHT | |
RESET = colorama.Style.RESET_ALL | |
except ImportError: | |
have_colorama = False | |
class colour: | |
GREEN = YELLOW = RED = RESET = "" | |
if os.sys.platform == "win32": | |
exe = ".exe" | |
dev_null = "NUL" | |
else: | |
exe = "" | |
dev_null = "/dev/null" | |
# | |
# 'my_root' is according this layout: | |
# <rtl_433_tests install-root>/bin/run_test.py | |
# <rtl_433_tests install-root>/tests/* | |
# | |
my_root = None | |
opt = None | |
def no_colour(): | |
if have_colorama: | |
colorama.deinit() | |
colour.GREEN = colour.RED = colour.YELLOW = colour.RESET = "" | |
def trace(level, s): | |
if opt.verbose >= level: | |
frame = sys._getframe(0) | |
line = frame.f_back.f_lineno | |
file = inspect.getsourcefile (frame.f_back) | |
print("%s%s(%d):%s %s" % (colour.GREEN, os.path.basename(file), line, colour.RESET, s)) | |
# | |
# This should be in current directory or on $PATH | |
# unless specified with the '-c' option. | |
# | |
def find_rtl_433(): | |
prog = "rtl_433" + exe | |
path = os.getenv('PATH').split(os.pathsep) | |
for p in path: | |
fn = os.path.join(p, prog) | |
if os.path.exists(fn): | |
return fn, True | |
return prog, False | |
def run_rtl433(input_fn, samplerate=None, protocol=None): | |
"""Run rtl_433 and return output.""" | |
args = ["-c", dev_null, "-M", "newmodel"] | |
if protocol: | |
args.extend(['-R', str(protocol)]) | |
if samplerate: | |
args.extend(['-s', str(samplerate)]) | |
args.extend(['-F', 'json', '-r', input_fn]) | |
cmd = [opt.rtl_433_cmd] + args | |
if opt.dry_run: | |
print ("%s%s %s%s%s" % (colour.GREEN, opt.rtl_433_cmd, colour.YELLOW, ' '.join(args[::]), colour.RESET)) | |
return (b"", 0, 0) | |
trace(1, " ".join(cmd)) | |
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, err = p.communicate() | |
# Pass warning messages through | |
for line in err.decode("utf-8").split("\n"): | |
if "WARNING:" in line: | |
print_warn(line, False) | |
return (out, err, p.returncode) | |
def find_json(): | |
"""Find all reference json files recursively.""" | |
matches = [] | |
tests_dir = os.path.join(my_root, 'tests') | |
for root, _dirnames, filenames in os.walk(tests_dir): | |
for filename in fnmatch.filter(filenames, '*.json'): | |
full_name = os.path.join(root, filename) | |
dir_spec = os.path.dirname(full_name) [len(tests_dir)+1:] + "/*" | |
trace(2, "dir_spec: %s" % dir_spec) | |
if fnmatch.fnmatch(os.path.basename(filename), opt.spec) or fnmatch.fnmatch(dir_spec, opt.spec): | |
trace(1, "Adding file: %s" % full_name) | |
matches.append(full_name) | |
else: | |
trace(2, "Ignoring file: %s" % full_name) | |
return matches | |
def remove_fields(data, fields): | |
"""Remove all data fields to be ignored.""" | |
for outline in data: | |
for field in fields: | |
if field in outline: | |
del outline[field] | |
return data | |
def print_warn(s, warn_prefix=True): | |
if warn_prefix: | |
print("%sWARNING%s: %s" % (colour.RED, colour.RESET, s)) | |
else: | |
print("%s%s%s" % (colour.RED, s, colour.RESET)) | |
def examples(): | |
return ''' | |
Some examples: | |
run_test.py ford* | |
run_test.py acurite/Acurite_00275rm/* | |
''' | |
def list_files (files): | |
prev_dir = "" | |
for f in files: | |
base = os.path.basename(f) | |
this_dir = os.path.dirname(f) | |
if this_dir != prev_dir: | |
print ("\n%s:" % this_dir.replace("\\", "/")) | |
note = "" | |
cu8 = os.path.splitext(f)[0] + ".cu8" | |
if not os.path.isfile(cu8): | |
note = " Missing %s%s%s" % (colour.RED, os.path.basename(cu8), colour.RESET) | |
elif not opt.no_ignore and os.path.isfile("%s/ignore" % this_dir): | |
note = " %sIgnoring%s" % (colour.GREEN, colour.RESET) | |
print (" %s%s" % (base, note)) | |
prev_dir = this_dir | |
def main(): | |
"""Check all reference json files vs actual output.""" | |
parser = argparse.ArgumentParser(description="Test rtl_433", | |
epilog=examples(), | |
formatter_class=argparse.RawDescriptionHelpFormatter) | |
rtl433_prog, rtl433_found = find_rtl_433() | |
parser.add_argument("-c", dest="rtl_433_cmd", default=rtl433_prog, | |
help='rtl_433 program to use (default: %s, %sfound)' % \ | |
(rtl433_prog, ["not ", ""][rtl433_found])) | |
parser.add_argument("-I", dest="ignore_fields", default=[], action="append", | |
help="Field to ignore in JSON data") | |
parser.add_argument("-f", "--first-line", dest="first_line", default=False, action="store_true", | |
help="Only compare the first outputed line of rtl433 " | |
"with first line of reference json") | |
parser.add_argument("-l", "--list", dest="list", default=False, action="store_true", | |
help="print a list of input files matching <spec>") | |
parser.add_argument("-n", "--dry-run", dest="dry_run", default=False, action="store_true", | |
help="just list what input files would have been tested") | |
parser.add_argument("-N", "--no-ignore", dest="no_ignore", default=False, action="store_true", | |
help="Do not ignore tests with a 'ignore' file in it's directory") | |
parser.add_argument("-v", "--verbose", dest="verbose", action="count", default=0, | |
help="increase verbose mode") | |
parser.add_argument("--no-colors", dest="no_colours", action="store_true", | |
help="Do not print in colors (if 'colorama' was found)") | |
parser.add_argument("spec", nargs=argparse.REMAINDER, | |
help="test only input-files or sub-directories matching <spec>\n" | |
"E.g. 'ford*' or 'emont/01/*'") | |
global opt | |
opt = parser.parse_args() | |
global my_root | |
my_root, _ = os.path.split (sys.argv[0]) | |
my_root = os.path.normpath (my_root + os.sep + '..') | |
trace(1, "my_root: %s" % my_root) | |
if opt.no_colours: | |
no_colour() | |
if opt.spec: | |
opt.spec = opt.spec[0] | |
else: | |
opt.spec = '*' | |
trace(1, "spec: %s" % opt.spec) | |
expected_json = find_json() | |
if opt.list: | |
opt.no_ignore = True | |
list_files (expected_json) | |
return | |
nb_ok = 0 | |
nb_fail = 0 | |
nb_ignored = 0 | |
false_positives = dict() | |
for output_fn in expected_json: | |
input_fn = os.path.splitext(output_fn)[0] + ".cu8" | |
if not os.path.isfile(input_fn): | |
print_warn("Missing '%s'" % input_fn) | |
continue | |
input_fn = os.path.normpath (input_fn) | |
ignore_fn = os.path.join(os.path.dirname(output_fn), "ignore") | |
if not opt.no_ignore and os.path.isfile(ignore_fn): | |
print_warn("Ignoring '%s'" % input_fn) | |
nb_ignored += 1 | |
continue | |
samplerate = 250000 | |
samplerate_fn = os.path.join(os.path.dirname(output_fn), "samplerate") | |
if os.path.isfile(samplerate_fn): | |
with open(samplerate_fn, "r") as samplerate_file: | |
samplerate = int(samplerate_file.readline()) | |
protocol = None | |
protocol_fn = os.path.join(os.path.dirname(output_fn), "protocol") | |
if os.path.isfile(protocol_fn): | |
with open(protocol_fn, "r") as protocol_file: | |
protocol = protocol_file.readline().strip() | |
# Open expected data | |
expected_data = [] | |
with open(output_fn, "r") as output_file: | |
try: | |
for json_line in output_file.readlines(): | |
if not json_line.strip(): | |
continue | |
expected_data.append(json.loads(json_line)) | |
except ValueError as _err: | |
print("%sERROR: invalid json:%s '%s'" % (colour.RED, colour.RESET, output_fn)) | |
continue | |
expected_data = remove_fields(expected_data, opt.ignore_fields) | |
# Run rtl_433 | |
rtl433out, _err, exitcode = run_rtl433(input_fn, samplerate, protocol) | |
if exitcode: | |
print("ERROR: Exited with %d '%s'" % (exitcode, input_fn)) | |
# get JSON results | |
rtl433out = rtl433out.decode('utf8').strip() | |
results = [] | |
for json_line in rtl433out.split("\n"): | |
if not json_line.strip(): | |
continue | |
try: | |
data = json.loads(json_line) | |
if "model" in data: | |
expected_model = expected_data[0]["model"] | |
actual_model = data["model"] | |
if actual_model != expected_model: | |
if actual_model not in false_positives: | |
false_positives[actual_model] = dict() | |
false_positives[actual_model]["count"] = 1 | |
false_positives[actual_model]["models"] = set() | |
false_positives[actual_model]["models"].add(expected_model) | |
else: | |
false_positives[actual_model]["count"] += 1 | |
false_positives[actual_model]["models"].add(expected_model) | |
continue | |
results.append(data) | |
except ValueError: | |
nb_fail += 1 | |
# TODO: factorise error print | |
print("%s## Failed%s: %s: invalid json output" % (colour.RED, colour.RESET, input_fn)) | |
print("%s" % json_line) | |
continue | |
results = remove_fields(results, opt.ignore_fields) | |
if opt.first_line: | |
if len(results) == 0: | |
results.append({}) | |
if len(expected_data) == 0: | |
expected_data.append({}) | |
expected_data, results = expected_data[0], results[0] | |
# Compute the diff | |
diff = DeepDiff(expected_data, results) | |
if diff and not opt.dry_run: | |
nb_fail += 1 | |
print("%s## Failed%s: %s" % (colour.RED, colour.RESET, input_fn)) | |
if 0: | |
pprint (diff, indent=2) | |
else: | |
for error, details in diff.items(): | |
print(" %s" % error) | |
for detail in details: | |
print(" * %s:" % detail) | |
print(" +%s%s%s" % (colour.GREEN, detail[1], colour.RESET)) | |
print(" -%s%s%s" % (colour.RED, detail[2], colour.RESET)) | |
else: | |
nb_ok += 1 | |
print("%s## Okay%s: %s" % (colour.GREEN, colour.RESET, input_fn)) | |
for model, values in false_positives.items(): | |
count = values["count"] | |
models = values["models"] | |
print_warn(f"{model} generated {count} false positive(s) in other decoders: {models}") | |
# print some summary | |
if opt.dry_run: | |
print("%d records tested, %d ignored." % (nb_ok, nb_ignored)) | |
else: | |
print("%d records tested, %d have failed, %d ignored." % (nb_ok+nb_fail, nb_fail, nb_ignored)) | |
return nb_fail | |
if __name__ == '__main__': | |
try: | |
rc = main() | |
except KeyboardInterrupt: | |
print ("^C") | |
rc = 0 | |
sys.exit(rc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment