Skip to content

Instantly share code, notes, and snippets.

@nickcjohnston
Last active November 10, 2019 13:10
Show Gist options
  • Save nickcjohnston/8d8a08b23a7ac21c4198bf4aee1c6444 to your computer and use it in GitHub Desktop.
Save nickcjohnston/8d8a08b23a7ac21c4198bf4aee1c6444 to your computer and use it in GitHub Desktop.
Quick shot at doing some high-level static analysis on a large collection of ELFs
import sys
import os
import hashlib
import magic
import multiprocessing
from time import sleep
import json
import r2pipe
from multiprocessing import Queue
MAX_WORKERS = 10
queue = Queue()
def main():
processes = [] # colletion of worker processes
samples = dict() # collection of malware samples
# Program should take 1 arg, a directory full of ELFs
if len(sys.argv) != 2:
print_usage_and_exit()
# Get arg. Check that it's a dictionary
directory = sys.argv[1]
if not os.path.isdir(directory):
print_usage_and_exit()
while True:
# Regularly poll the directory to see if new ELFs have been added
for root, dirs, files in os.walk(directory):
for file in files:
if file not in samples.keys():
new_sample = {}
# get some basic info for latest sample
new_sample['filename'] = file
new_sample['filepath'] = os.path.join(root, file)
samples[file] = new_sample
# add the sample to the queue for processing
queue.put(new_sample)
# Make sure there are MAX_WORKERS workers
if len(processes) < MAX_WORKERS:
for i in range(MAX_WORKERS - len(processes)):
# Create a new worker process, have it run "scan_samples" forever
new_process = multiprocessing.Process(target=scan_samples)
processes.append(new_process)
#print("Starting worker")
new_process.start()
counter = 0
# Check for dead processes and delete, recreate
for process in processes:
if not process.is_alive():
del processes[counter]
new_process = multiprocessing.Process(target=scan_samples)
processes.append(new_process)
#print("Starting worker")
new_process.start()
counter += 1
print(f"{queue.qsize()} of {len(samples)} remaining.", file=sys.stderr)
# Sleep for a bit before we check the directory and workers again
sleep(30)
def print_usage_and_exit():
print(f"Usage: {sys.argv[0]} directory_of_elf_malware_samples")
exit()
# Get the sample size in bytes
def get_filesize(sample):
sample['size'] = os.path.getsize(sample['filepath'])
# Get a bunch of different hash values for the sample
def get_hashes(sample):
MAX_BLOCK = 65535
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
sha512 = hashlib.sha512()
# Reading the file in blocks like this incase it's massive
with open(sample['filepath'], 'rb') as f:
buffer = f.read(MAX_BLOCK)
while len(buffer) > 0:
md5.update(buffer)
sha1.update(buffer)
sha256.update(buffer)
sha512.update(buffer)
buffer = f.read(MAX_BLOCK)
sample['md5'] = md5.hexdigest()
sample['sha1'] = sha1.hexdigest()
sample['sha256'] = sha256.hexdigest()
sample['sha512'] = sha512.hexdigest()
# Use the "magic" library to determine the file's type
def get_filetype(sample):
sample['filetype'] = magic.from_file(sample['filepath'])
# Helper - currently broken as r2pipe is printing the errors anyways.
def r2pipe_command(open_r2_pipe, command):
try:
data = open_r2_pipe.cmdj(command)
return data
except: #probably r2pipe.cmdj.Error
return "Error"
# Call a bunch of functions on the sample from radare2
def do_radare2_stuff(sample):
# Could expand this to force architecture and bits but this is PoC
r2 = r2pipe.open(filename=sample['filepath'], flags=["-2"])
r2.cmd("aaaa")
# General info like cpu arch, stripped or not, etc.
sample['radare2_info'] = r2pipe_command(r2, "iJj")
# List of functions
sample['radare2_function_list'] = r2pipe_command(r2, "aflj")
# Get the strings from JUST the data section
# Commented out for now, redundant with all strings below
#sample['radare2_data_strings'] = r2pipe_command(r2, "izj")
# All strings in the binary
sample['radare2_all_strings'] = r2pipe_command(r2, "izzj")
# Get a list of sections (data, text, bss, etc)
sample['radare2_sections'] = r2pipe_command(r2, "iSj")
# Get the symbol table (if there is one)
sample['radare2_symbols'] = r2pipe_command(r2, "isj")
# Get linked libraries (if any)
sample['radare2_linked_libraries'] = r2pipe_command(r2, "ilj")
# Get list of imports (if any)
sample['radare2_imports'] = r2pipe_command(r2, "iij")
# Get the program's entry point (typically a main function)
sample['radare2_entry_point'] = r2pipe_command(r2, "iej")
r2.quit()
# The is the function the worker processes run
def scan_samples():
while True:
# if there are no samples, sleep for 15 seconds and check again
if queue.empty():
sleep(15)
continue
# Get a sample from the queue
sample = queue.get()
# Call all the processing functions
get_filetype(sample)
get_hashes(sample)
get_filesize(sample)
do_radare2_stuff(sample)
# At this point "sample" is a python dictionary so we can just
# print the whole thing as pretty json.
#output = json.dumps(sample,indent=4, sort_keys=True)
output = json.dumps(sample)
print(f"{output},")
# Call main
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment