Last active
November 10, 2019 13:10
-
-
Save nickcjohnston/8d8a08b23a7ac21c4198bf4aee1c6444 to your computer and use it in GitHub Desktop.
Quick shot at doing some high-level static analysis on a large collection of ELFs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import hashlib | |
import magic | |
import multiprocessing | |
from time import sleep | |
import json | |
import r2pipe | |
from multiprocessing import Queue | |
MAX_WORKERS = 10 | |
queue = Queue() | |
def main(): | |
processes = [] # colletion of worker processes | |
samples = dict() # collection of malware samples | |
# Program should take 1 arg, a directory full of ELFs | |
if len(sys.argv) != 2: | |
print_usage_and_exit() | |
# Get arg. Check that it's a dictionary | |
directory = sys.argv[1] | |
if not os.path.isdir(directory): | |
print_usage_and_exit() | |
while True: | |
# Regularly poll the directory to see if new ELFs have been added | |
for root, dirs, files in os.walk(directory): | |
for file in files: | |
if file not in samples.keys(): | |
new_sample = {} | |
# get some basic info for latest sample | |
new_sample['filename'] = file | |
new_sample['filepath'] = os.path.join(root, file) | |
samples[file] = new_sample | |
# add the sample to the queue for processing | |
queue.put(new_sample) | |
# Make sure there are MAX_WORKERS workers | |
if len(processes) < MAX_WORKERS: | |
for i in range(MAX_WORKERS - len(processes)): | |
# Create a new worker process, have it run "scan_samples" forever | |
new_process = multiprocessing.Process(target=scan_samples) | |
processes.append(new_process) | |
#print("Starting worker") | |
new_process.start() | |
counter = 0 | |
# Check for dead processes and delete, recreate | |
for process in processes: | |
if not process.is_alive(): | |
del processes[counter] | |
new_process = multiprocessing.Process(target=scan_samples) | |
processes.append(new_process) | |
#print("Starting worker") | |
new_process.start() | |
counter += 1 | |
print(f"{queue.qsize()} of {len(samples)} remaining.", file=sys.stderr) | |
# Sleep for a bit before we check the directory and workers again | |
sleep(30) | |
def print_usage_and_exit(): | |
print(f"Usage: {sys.argv[0]} directory_of_elf_malware_samples") | |
exit() | |
# Get the sample size in bytes | |
def get_filesize(sample): | |
sample['size'] = os.path.getsize(sample['filepath']) | |
# Get a bunch of different hash values for the sample | |
def get_hashes(sample): | |
MAX_BLOCK = 65535 | |
md5 = hashlib.md5() | |
sha1 = hashlib.sha1() | |
sha256 = hashlib.sha256() | |
sha512 = hashlib.sha512() | |
# Reading the file in blocks like this incase it's massive | |
with open(sample['filepath'], 'rb') as f: | |
buffer = f.read(MAX_BLOCK) | |
while len(buffer) > 0: | |
md5.update(buffer) | |
sha1.update(buffer) | |
sha256.update(buffer) | |
sha512.update(buffer) | |
buffer = f.read(MAX_BLOCK) | |
sample['md5'] = md5.hexdigest() | |
sample['sha1'] = sha1.hexdigest() | |
sample['sha256'] = sha256.hexdigest() | |
sample['sha512'] = sha512.hexdigest() | |
# Use the "magic" library to determine the file's type | |
def get_filetype(sample): | |
sample['filetype'] = magic.from_file(sample['filepath']) | |
# Helper - currently broken as r2pipe is printing the errors anyways. | |
def r2pipe_command(open_r2_pipe, command): | |
try: | |
data = open_r2_pipe.cmdj(command) | |
return data | |
except: #probably r2pipe.cmdj.Error | |
return "Error" | |
# Call a bunch of functions on the sample from radare2 | |
def do_radare2_stuff(sample): | |
# Could expand this to force architecture and bits but this is PoC | |
r2 = r2pipe.open(filename=sample['filepath'], flags=["-2"]) | |
r2.cmd("aaaa") | |
# General info like cpu arch, stripped or not, etc. | |
sample['radare2_info'] = r2pipe_command(r2, "iJj") | |
# List of functions | |
sample['radare2_function_list'] = r2pipe_command(r2, "aflj") | |
# Get the strings from JUST the data section | |
# Commented out for now, redundant with all strings below | |
#sample['radare2_data_strings'] = r2pipe_command(r2, "izj") | |
# All strings in the binary | |
sample['radare2_all_strings'] = r2pipe_command(r2, "izzj") | |
# Get a list of sections (data, text, bss, etc) | |
sample['radare2_sections'] = r2pipe_command(r2, "iSj") | |
# Get the symbol table (if there is one) | |
sample['radare2_symbols'] = r2pipe_command(r2, "isj") | |
# Get linked libraries (if any) | |
sample['radare2_linked_libraries'] = r2pipe_command(r2, "ilj") | |
# Get list of imports (if any) | |
sample['radare2_imports'] = r2pipe_command(r2, "iij") | |
# Get the program's entry point (typically a main function) | |
sample['radare2_entry_point'] = r2pipe_command(r2, "iej") | |
r2.quit() | |
# The is the function the worker processes run | |
def scan_samples(): | |
while True: | |
# if there are no samples, sleep for 15 seconds and check again | |
if queue.empty(): | |
sleep(15) | |
continue | |
# Get a sample from the queue | |
sample = queue.get() | |
# Call all the processing functions | |
get_filetype(sample) | |
get_hashes(sample) | |
get_filesize(sample) | |
do_radare2_stuff(sample) | |
# At this point "sample" is a python dictionary so we can just | |
# print the whole thing as pretty json. | |
#output = json.dumps(sample,indent=4, sort_keys=True) | |
output = json.dumps(sample) | |
print(f"{output},") | |
# Call main | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment