Skip to content

Instantly share code, notes, and snippets.

@RKX1209
Last active February 18, 2022 08:01
Show Gist options
  • Save RKX1209/3cb60b0fa0ba92da6575716680f32aa0 to your computer and use it in GitHub Desktop.
Save RKX1209/3cb60b0fa0ba92da6575716680f32aa0 to your computer and use it in GitHub Desktop.
Different path tracer
import networkx
import angr
import tracer
import sys
import os
import logging
import pickle
import argparse
from collections import Counter
from bingraphvis import *
from bingraphvis.angr import *
from bingraphvis.angr.x86 import *
CFG_CACHE_DIR = "cache"
OUTPUT_DIR = "outd"
CFG_CACHE = CFG_CACHE_DIR + "/" + "cfg_cache_%s_%s.pickle"
OUTPUT = OUTPUT_DIR + "/" + "input_trace_%s_%s"
colors = ["red", "blue"]
plot_all_cfg = False
class AngrRemainDiffs(Transformer):
def __init__(self, traces):
self.traces = traces
def trace_idx(self, addr):
idx = ()
if len(self.traces) < 2:
return ()
for i, t in enumerate(self.traces):
if addr in t:
idx = idx + (i, )
return idx
def transform(self, graph):
remove = []
for n in graph.nodes:
all_idx = self.trace_idx(n.obj.addr)
if len(all_idx) == 0:
remove.append(n)
if len(all_idx) == len(self.traces): # exist in all traces or empty
if plot_all_cfg == True:
n.color = "black"
else:
remove.append(n)
if len(all_idx) == 1: # exist in an only one trace
idx = all_idx[0]
n.color = colors[idx]
# print(hex(n.obj.addr), ' ', idx)
# print([hex(r.obj.addr) for r in remove])
for r in remove:
graph.remove_node(r)
print("Graph len= %d" % len(graph.nodes))
def check_env():
with open("/proc/sys/kernel/randomize_va_space",'r') as rfd:
b = rfd.read(1)
if int(b) != 0:
print("ASLR is not disabled. Run: echo 0 | sudo tee /proc/sys/kernel/randomize_va_space")
exit(0)
def get_full_cfg(cache_path, start_addr, start_state=None):
if os.path.isfile(cache_path):
print("[+] CFG Cache found ")
cfg_full = proj.analyses.CFGEmulated(fail_fast=True, starts=[start_addr], initial_state=start_state, no_construct = True)
with open(cache_path, 'rb') as fp:
cfg_full._graph = pickle.load(fp) # XXX: Dirty hack (due to derivied class serialization problem)
else:
print("[+] Creating CFG ....")
cfg_full = proj.analyses.CFGEmulated(fail_fast=True, starts=[start_addr], initial_state=start_state, show_progressbar=True)
with open(cache_path, 'wb') as fp:
pickle.dump(cfg_full.graph, fp)
return cfg_full
def plot_cfg_with_trace(cfg, fname, traces, format="raw"):
vis = AngrVisFactory().default_cfg_pipeline(cfg, asminst=True, vexinst=False)
vis.add_transformer(AngrRemainDiffs(traces))
vis.set_output(DotOutput(fname, format=format))
vis.process(cfg.graph)
def get_traces(proj, bin_path, input_paths):
_traces = []
for ipath in input_paths:
print("[+] Tracing .... %s" % (ipath))
runner = tracer.QEMURunner(project=proj, input=b'', trace_log_limit=2**34, argv=[bin_path, ipath])
#print([hex(r) for r in runner.trace])
_traces.append(Counter(runner.trace))
print("Size: %d" % (len(runner.trace)))
return _traces
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Input Tracer option')
parser.add_argument('-b','--binary', help='Binary path',required=True)
parser.add_argument('-i','--inputs', help='Inputs aginst binaries',required=True)
parser.add_argument('-f','--func', help='Dump per functions', action='store_true', default=False)
parser.add_argument('-v','--verbose', help='Plot entire CFG', action='store_true', default=False)
args = parser.parse_args()
bin_path = args.binary
plot_all_cfg = args.verbose
input_paths = [p for p in args.inputs.split(',')]
bin_name = bin_path.split('/')[-1]
# Check Environment
check_env()
logging.getLogger('angr').setLevel('ERROR')
# Crate Directories for Input and CFG Cache
try:
os.mkdir(CFG_CACHE_DIR)
os.mkdir(OUTPUT_DIR)
except OSError:
pass
print("[+] Opening binary %s" % (bin_path))
proj = angr.Project(bin_path,load_options={'auto_load_libs': False})
if args.func:
print("[+] Searching for all the functions (using CFGFast)")
fast_cfg = proj.analyses.CFGFast(show_progressbar=True)
# we want to only keep functions defined in the main binary
functions = []
for addr,func in fast_cfg.functions.items():
if proj.loader.main_object.contains_addr(addr) and addr not in proj.loader.main_object.reverse_plt:
functions.append((addr,func))
nb_functions = len(functions)
print(" ==> %d functions to process." % nb_functions)
traces = get_traces(proj, bin_path, input_paths)
print ("[+] CFG processing ....")
i = 0
for addr,func in functions:
cache = CFG_CACHE % (bin_name, func.name)
output = OUTPUT % (bin_name, func.name)
print("[+] (%d/%d) Computing Accurate CFG for function %s (0x%x)" % (i, nb_functions, func.name,addr))
cfg_func = get_full_cfg(cache, addr)
plot_cfg_with_trace(cfg_func, output, traces)
print ("[+] Complete! Saved to %s.png" % (output))
i += 1
else:
cache = CFG_CACHE % (bin_path.split('/')[-1], "entire")
output = OUTPUT % (bin_name, "entire")
main = proj.loader.main_object.get_symbol("main")
start_state = proj.factory.blank_state(addr=main.rebased_addr)
cfg_full = get_full_cfg(cache, main.rebased_addr, start_state)
print ("CFG size = %d" % (len(cfg_full.graph)))
traces = get_traces(proj, bin_path, input_paths)
print ("[+] CFG processing ....")
plot_cfg_with_trace(cfg_full, output, traces)
print ("[+] Complete! Saved to %s.png" % (output))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment