Last active
March 6, 2020 17:18
-
-
Save deeso/5ef2e91714ca026e719424bf429fab16 to your computer and use it in GitHub Desktop.
analyze file content using regular expressions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requirements | |
# requests magic pdfminer | |
import json | |
import re | |
from pdfminer.high_level import extract_text as pdf_extract_text | |
import traceback | |
import os | |
from requests import get | |
from hashlib import md5 | |
from multiprocessing import * | |
import sqlite3 | |
import magic | |
import argparse | |
import string | |
CONN = None | |
INSERT = None | |
NORMALIZED_HASH = 'normalized_hash' | |
CONTENT_TYPE = 'content_type' | |
FILENAME = 'filename' | |
USERNAME = 'username' | |
FILE_HASH = 'file_hash' | |
PASSWORD = 'password' | |
USERNAME = 'username' | |
FAILED = 'failed' | |
SSH = 'ssh' | |
IP = 'ip' | |
RE_USERNAME = '''\s+username''' | |
RE_PASSWORD = '''\s+password''' | |
RE_SSH = '''\s+ssh\s''' | |
RE_IP = '''\s\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}''' | |
REGEXS = { | |
USERNAME: re.compile(RE_USERNAME, re.IGNORECASE), | |
PASSWORD: re.compile(RE_PASSWORD, re.IGNORECASE), | |
USERNAME: re.compile(RE_USERNAME, re.IGNORECASE), | |
SSH: re.compile(RE_SSH, re.IGNORECASE), | |
IP: re.compile(RE_IP, re.IGNORECASE), | |
} | |
NUM_PROCS = 30 | |
DB_NAME = 'data_analysis.db' | |
FILES_BASE = 'downloaded_files' | |
TABLE_NAME = 'regex_results' | |
REGEX = 'regex' | |
REGEX_JSON = 'test.json' | |
RESUME = True | |
parser = argparse.ArgumentParser(description='process files in directory.') | |
parser.add_argument('-regex', type=str, default=REGEX_JSON, | |
help='column to take the urls from') | |
parser.add_argument('-db_name', type=str, default=DB_NAME, | |
help='sqlite db to pull from') | |
parser.add_argument('-files_base', type=str, default=FILES_BASE, | |
help='place to store downloaded files') | |
parser.add_argument('-table_name', type=str, default=TABLE_NAME, | |
help='table name for the results') | |
parser.add_argument('-resume', action='store_true', default=False, | |
help='resume download based on database') | |
TEXT = [i.lower() for i in ['ASCII', 'HTML', 'UTF', 'XML']] | |
PDF = ['pdf'] | |
ISO_8859 = ['iso-8859'] | |
def normalize_text(content, keep=None, remove=None, replace=None): | |
keep = [':', '.'] if keep is None else keep | |
remove = "".join([i for i in string.punctuation if not i in keep]) if remove is None else remove | |
replace = ' ' if replace is None else replace | |
# replace all non-alphanum chars (excluding .) | |
if isinstance(content, str): | |
content = content.encode('utf8') | |
content = content.lower() | |
# remove = remove.encode('utf8') | |
replace = replace.encode('utf8') | |
for c in remove: | |
if isinstance(c, int): | |
c = bytes(c) | |
content = content.replace(c.encode('utf8'), replace) | |
return b' '.join(content.split()).lower() | |
def perform_content_analysis(normalized, regexs): | |
results = {name: False for name in regexs} | |
results[NORMALIZED_HASH] = md5(normalized).hexdigest() | |
for name, regex in regexs.items(): | |
results[name] = search_content(normalized, regex) | |
return results | |
def analyze(info): | |
filename, remove, keep, replace, regexs = info | |
return analyze_file(filename, remove, keep, replace, regexs) | |
def analyze_file(filename, remove=None, keep=None, replace=None, regexs=None): | |
mg = magic.from_file(filename).lower() | |
fh = md5(open(filename, 'rb').read()).hexdigest() | |
first = mg.split()[0] | |
results = None | |
if first in PDF: | |
results = analyze_pdf(filename, remove=remove, keep=keep, replace=replace, regexs=regexs) | |
elif first in TEXT: | |
results = analyze_text(filename, remove=remove, keep=keep, replace=replace, regexs=regexs) | |
elif first in ISO_8859: | |
results = analyze_text(filename, remove=remove, keep=keep, replace=replace, regexs=regexs) | |
if results is not None: | |
results[CONTENT_TYPE] = first | |
results[FILE_HASH] = fh | |
return results | |
failed = None | |
results = {name: False for name in regexs} | |
results[CONTENT_TYPE] = first | |
results[FILE_HASH] = fh | |
results[NORMALIZED_HASH] = fh | |
results[FILENAME] = filename | |
results[FAILED] = "Unable to parse file type" | |
return results | |
def analyze_iso8559(filename, remove=None, keep=None, replace=None, regexs=None): | |
regexs = regexs if isinstance(regexs, dict) else REGEXS | |
raw_content = open(filename).read().decode('iso-8859-1').encode('utf8') | |
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace) | |
failed = None | |
results = {FILENAME: filename, FAILED: failed} | |
results.update(perform_content_analysis(normalized, regexs)) | |
return results | |
def analyze_text(filename, remove=None, keep=None, replace=None, regexs=None): | |
regexs = regexs if isinstance(regexs, dict) else REGEXS | |
raw_content = open(filename, 'rb').read() | |
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace) | |
failed = None | |
results = {FILENAME: filename, FAILED: failed} | |
results.update(perform_content_analysis(normalized, regexs)) | |
return results | |
def analyze_pdf(filename, remove=None, keep=None, replace=None, regexs=None): | |
regexs = regexs if isinstance(regexs, dict) else REGEXS | |
failed = None | |
try: | |
raw_content = pdf_extract_text(filename) | |
except: | |
raw_content = '' | |
failed = "{} failed: {}".format(filename, traceback.format_exc()) | |
print(failed) | |
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace) | |
results = {FILENAME: filename, FAILED: failed} | |
results.update(perform_content_analysis(normalized, regexs)) | |
return results | |
def search_content(normalized_content, regex): | |
return False if regex.search(normalized_content) is None else True | |
def handle_completed_result(result, max, cnt, sorted_columns): | |
filename = result[FILENAME] | |
content_type = result[CONTENT_TYPE] | |
values = [result[i] for i in sorted_columns] | |
print("Handling results for [{} of {}]: {} ".format(cnt, max, filename, content_type)) | |
try: | |
cur = CONN.cursor() | |
cur.execute(INSERT, values) | |
cur.close() | |
CONN.commit() | |
except: | |
print(traceback.format_exc()) | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
CONN = sqlite3.connect(args.db_name) | |
try: | |
regexes = json.load(open(args.regex)) | |
REGEXS = {k: re.compile(v.encode('utf8'), re.IGNORECASE) for k, v in regexes.items()} | |
sorted_columns = sorted(REGEXS.keys()) | |
except: | |
pass | |
table_name = args.table_name | |
sorted_columns = sorted(REGEXS.keys()) | |
all_columns = [NORMALIZED_HASH, FILE_HASH, FILENAME, CONTENT_TYPE, FAILED] + sorted_columns | |
create_table_fmt = "CREATE TABLE {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, normalized_hash TEXT, file_hash TEXT, filename TEXT, content_type TEXT, failed TEXT, {columns});" | |
columns = ['{} TEXT'.format(k) for k in sorted_columns] | |
create_stmt = create_table_fmt.format(**{'columns': ", ".join(columns), 'table_name': table_name}) | |
try: | |
CONN.execute(create_stmt) | |
CONN.commit() | |
except: | |
print(traceback.format_exc()) | |
insert_table_fmt = "INSERT INTO {table_name}({columns}) VALUES({values})" | |
insert_stmt = insert_table_fmt.format(**{ | |
'table_name': table_name, | |
'columns': ", ".join(all_columns), | |
'values': ', '.join(['?' for _ in all_columns]) | |
}) | |
INSERT = insert_stmt | |
current_results = {} | |
base_dir = args.files_base | |
files = set([os.path.join(base_dir, i) for i in os.listdir(base_dir)]) | |
if args.resume: | |
cur = CONN.cursor() | |
print("Resuming, checking for previously analyzed files") | |
results = set([i[0] for i in cur.execute('SELECT filename from {}'.format(table_name))]) | |
print("Removing previously analyzed files") | |
uu = set([i for i in files if i not in results]) | |
del files | |
files = uu | |
infos = [(i, None, [':', '.'], " ", REGEXS) for i in files] | |
max = len(infos) | |
cnt = 0 | |
with Pool(processes=NUM_PROCS) as pool: | |
futures = pool.imap_unordered(analyze, infos) | |
for result in futures: | |
handle_completed_result(result, max, cnt, all_columns) | |
cnt += 1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment