Last active
February 18, 2022 08:01
-
-
Save RKX1209/3cb60b0fa0ba92da6575716680f32aa0 to your computer and use it in GitHub Desktop.
Different path tracer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import networkx | |
import angr | |
import tracer | |
import sys | |
import os | |
import logging | |
import pickle | |
import argparse | |
from collections import Counter | |
from bingraphvis import * | |
from bingraphvis.angr import * | |
from bingraphvis.angr.x86 import * | |
CFG_CACHE_DIR = "cache" | |
OUTPUT_DIR = "outd" | |
CFG_CACHE = CFG_CACHE_DIR + "/" + "cfg_cache_%s_%s.pickle" | |
OUTPUT = OUTPUT_DIR + "/" + "input_trace_%s_%s" | |
colors = ["red", "blue"] | |
plot_all_cfg = False | |
class AngrRemainDiffs(Transformer): | |
def __init__(self, traces): | |
self.traces = traces | |
def trace_idx(self, addr): | |
idx = () | |
if len(self.traces) < 2: | |
return () | |
for i, t in enumerate(self.traces): | |
if addr in t: | |
idx = idx + (i, ) | |
return idx | |
def transform(self, graph): | |
remove = [] | |
for n in graph.nodes: | |
all_idx = self.trace_idx(n.obj.addr) | |
if len(all_idx) == 0: | |
remove.append(n) | |
if len(all_idx) == len(self.traces): # exist in all traces or empty | |
if plot_all_cfg == True: | |
n.color = "black" | |
else: | |
remove.append(n) | |
if len(all_idx) == 1: # exist in an only one trace | |
idx = all_idx[0] | |
n.color = colors[idx] | |
# print(hex(n.obj.addr), ' ', idx) | |
# print([hex(r.obj.addr) for r in remove]) | |
for r in remove: | |
graph.remove_node(r) | |
print("Graph len= %d" % len(graph.nodes)) | |
def check_env(): | |
with open("/proc/sys/kernel/randomize_va_space",'r') as rfd: | |
b = rfd.read(1) | |
if int(b) != 0: | |
print("ASLR is not disabled. Run: echo 0 | sudo tee /proc/sys/kernel/randomize_va_space") | |
exit(0) | |
def get_full_cfg(cache_path, start_addr, start_state=None): | |
if os.path.isfile(cache_path): | |
print("[+] CFG Cache found ") | |
cfg_full = proj.analyses.CFGEmulated(fail_fast=True, starts=[start_addr], initial_state=start_state, no_construct = True) | |
with open(cache_path, 'rb') as fp: | |
cfg_full._graph = pickle.load(fp) # XXX: Dirty hack (due to derivied class serialization problem) | |
else: | |
print("[+] Creating CFG ....") | |
cfg_full = proj.analyses.CFGEmulated(fail_fast=True, starts=[start_addr], initial_state=start_state, show_progressbar=True) | |
with open(cache_path, 'wb') as fp: | |
pickle.dump(cfg_full.graph, fp) | |
return cfg_full | |
def plot_cfg_with_trace(cfg, fname, traces, format="raw"): | |
vis = AngrVisFactory().default_cfg_pipeline(cfg, asminst=True, vexinst=False) | |
vis.add_transformer(AngrRemainDiffs(traces)) | |
vis.set_output(DotOutput(fname, format=format)) | |
vis.process(cfg.graph) | |
def get_traces(proj, bin_path, input_paths): | |
_traces = [] | |
for ipath in input_paths: | |
print("[+] Tracing .... %s" % (ipath)) | |
runner = tracer.QEMURunner(project=proj, input=b'', trace_log_limit=2**34, argv=[bin_path, ipath]) | |
#print([hex(r) for r in runner.trace]) | |
_traces.append(Counter(runner.trace)) | |
print("Size: %d" % (len(runner.trace))) | |
return _traces | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Input Tracer option') | |
parser.add_argument('-b','--binary', help='Binary path',required=True) | |
parser.add_argument('-i','--inputs', help='Inputs aginst binaries',required=True) | |
parser.add_argument('-f','--func', help='Dump per functions', action='store_true', default=False) | |
parser.add_argument('-v','--verbose', help='Plot entire CFG', action='store_true', default=False) | |
args = parser.parse_args() | |
bin_path = args.binary | |
plot_all_cfg = args.verbose | |
input_paths = [p for p in args.inputs.split(',')] | |
bin_name = bin_path.split('/')[-1] | |
# Check Environment | |
check_env() | |
logging.getLogger('angr').setLevel('ERROR') | |
# Crate Directories for Input and CFG Cache | |
try: | |
os.mkdir(CFG_CACHE_DIR) | |
os.mkdir(OUTPUT_DIR) | |
except OSError: | |
pass | |
print("[+] Opening binary %s" % (bin_path)) | |
proj = angr.Project(bin_path,load_options={'auto_load_libs': False}) | |
if args.func: | |
print("[+] Searching for all the functions (using CFGFast)") | |
fast_cfg = proj.analyses.CFGFast(show_progressbar=True) | |
# we want to only keep functions defined in the main binary | |
functions = [] | |
for addr,func in fast_cfg.functions.items(): | |
if proj.loader.main_object.contains_addr(addr) and addr not in proj.loader.main_object.reverse_plt: | |
functions.append((addr,func)) | |
nb_functions = len(functions) | |
print(" ==> %d functions to process." % nb_functions) | |
traces = get_traces(proj, bin_path, input_paths) | |
print ("[+] CFG processing ....") | |
i = 0 | |
for addr,func in functions: | |
cache = CFG_CACHE % (bin_name, func.name) | |
output = OUTPUT % (bin_name, func.name) | |
print("[+] (%d/%d) Computing Accurate CFG for function %s (0x%x)" % (i, nb_functions, func.name,addr)) | |
cfg_func = get_full_cfg(cache, addr) | |
plot_cfg_with_trace(cfg_func, output, traces) | |
print ("[+] Complete! Saved to %s.png" % (output)) | |
i += 1 | |
else: | |
cache = CFG_CACHE % (bin_path.split('/')[-1], "entire") | |
output = OUTPUT % (bin_name, "entire") | |
main = proj.loader.main_object.get_symbol("main") | |
start_state = proj.factory.blank_state(addr=main.rebased_addr) | |
cfg_full = get_full_cfg(cache, main.rebased_addr, start_state) | |
print ("CFG size = %d" % (len(cfg_full.graph))) | |
traces = get_traces(proj, bin_path, input_paths) | |
print ("[+] CFG processing ....") | |
plot_cfg_with_trace(cfg_full, output, traces) | |
print ("[+] Complete! Saved to %s.png" % (output)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment