Skip to content

Instantly share code, notes, and snippets.

@deeso
Last active March 6, 2020 17:18
Show Gist options
  • Save deeso/5ef2e91714ca026e719424bf429fab16 to your computer and use it in GitHub Desktop.
Save deeso/5ef2e91714ca026e719424bf429fab16 to your computer and use it in GitHub Desktop.
analyze file content using regular expressions
# requirements
# requests magic pdfminer
import json
import re
from pdfminer.high_level import extract_text as pdf_extract_text
import traceback
import os
from requests import get
from hashlib import md5
from multiprocessing import *
import sqlite3
import magic
import argparse
import string
CONN = None
INSERT = None
NORMALIZED_HASH = 'normalized_hash'
CONTENT_TYPE = 'content_type'
FILENAME = 'filename'
USERNAME = 'username'
FILE_HASH = 'file_hash'
PASSWORD = 'password'
USERNAME = 'username'
FAILED = 'failed'
SSH = 'ssh'
IP = 'ip'
RE_USERNAME = '''\s+username'''
RE_PASSWORD = '''\s+password'''
RE_SSH = '''\s+ssh\s'''
RE_IP = '''\s\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'''
REGEXS = {
USERNAME: re.compile(RE_USERNAME, re.IGNORECASE),
PASSWORD: re.compile(RE_PASSWORD, re.IGNORECASE),
USERNAME: re.compile(RE_USERNAME, re.IGNORECASE),
SSH: re.compile(RE_SSH, re.IGNORECASE),
IP: re.compile(RE_IP, re.IGNORECASE),
}
NUM_PROCS = 30
DB_NAME = 'data_analysis.db'
FILES_BASE = 'downloaded_files'
TABLE_NAME = 'regex_results'
REGEX = 'regex'
REGEX_JSON = 'test.json'
RESUME = True
parser = argparse.ArgumentParser(description='process files in directory.')
parser.add_argument('-regex', type=str, default=REGEX_JSON,
help='column to take the urls from')
parser.add_argument('-db_name', type=str, default=DB_NAME,
help='sqlite db to pull from')
parser.add_argument('-files_base', type=str, default=FILES_BASE,
help='place to store downloaded files')
parser.add_argument('-table_name', type=str, default=TABLE_NAME,
help='table name for the results')
parser.add_argument('-resume', action='store_true', default=False,
help='resume download based on database')
TEXT = [i.lower() for i in ['ASCII', 'HTML', 'UTF', 'XML']]
PDF = ['pdf']
ISO_8859 = ['iso-8859']
def normalize_text(content, keep=None, remove=None, replace=None):
keep = [':', '.'] if keep is None else keep
remove = "".join([i for i in string.punctuation if not i in keep]) if remove is None else remove
replace = ' ' if replace is None else replace
# replace all non-alphanum chars (excluding .)
if isinstance(content, str):
content = content.encode('utf8')
content = content.lower()
# remove = remove.encode('utf8')
replace = replace.encode('utf8')
for c in remove:
if isinstance(c, int):
c = bytes(c)
content = content.replace(c.encode('utf8'), replace)
return b' '.join(content.split()).lower()
def perform_content_analysis(normalized, regexs):
results = {name: False for name in regexs}
results[NORMALIZED_HASH] = md5(normalized).hexdigest()
for name, regex in regexs.items():
results[name] = search_content(normalized, regex)
return results
def analyze(info):
filename, remove, keep, replace, regexs = info
return analyze_file(filename, remove, keep, replace, regexs)
def analyze_file(filename, remove=None, keep=None, replace=None, regexs=None):
mg = magic.from_file(filename).lower()
fh = md5(open(filename, 'rb').read()).hexdigest()
first = mg.split()[0]
results = None
if first in PDF:
results = analyze_pdf(filename, remove=remove, keep=keep, replace=replace, regexs=regexs)
elif first in TEXT:
results = analyze_text(filename, remove=remove, keep=keep, replace=replace, regexs=regexs)
elif first in ISO_8859:
results = analyze_text(filename, remove=remove, keep=keep, replace=replace, regexs=regexs)
if results is not None:
results[CONTENT_TYPE] = first
results[FILE_HASH] = fh
return results
failed = None
results = {name: False for name in regexs}
results[CONTENT_TYPE] = first
results[FILE_HASH] = fh
results[NORMALIZED_HASH] = fh
results[FILENAME] = filename
results[FAILED] = "Unable to parse file type"
return results
def analyze_iso8559(filename, remove=None, keep=None, replace=None, regexs=None):
regexs = regexs if isinstance(regexs, dict) else REGEXS
raw_content = open(filename).read().decode('iso-8859-1').encode('utf8')
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace)
failed = None
results = {FILENAME: filename, FAILED: failed}
results.update(perform_content_analysis(normalized, regexs))
return results
def analyze_text(filename, remove=None, keep=None, replace=None, regexs=None):
regexs = regexs if isinstance(regexs, dict) else REGEXS
raw_content = open(filename, 'rb').read()
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace)
failed = None
results = {FILENAME: filename, FAILED: failed}
results.update(perform_content_analysis(normalized, regexs))
return results
def analyze_pdf(filename, remove=None, keep=None, replace=None, regexs=None):
regexs = regexs if isinstance(regexs, dict) else REGEXS
failed = None
try:
raw_content = pdf_extract_text(filename)
except:
raw_content = ''
failed = "{} failed: {}".format(filename, traceback.format_exc())
print(failed)
normalized = normalize_text(raw_content, keep=keep, remove=remove, replace=replace)
results = {FILENAME: filename, FAILED: failed}
results.update(perform_content_analysis(normalized, regexs))
return results
def search_content(normalized_content, regex):
return False if regex.search(normalized_content) is None else True
def handle_completed_result(result, max, cnt, sorted_columns):
filename = result[FILENAME]
content_type = result[CONTENT_TYPE]
values = [result[i] for i in sorted_columns]
print("Handling results for [{} of {}]: {} ".format(cnt, max, filename, content_type))
try:
cur = CONN.cursor()
cur.execute(INSERT, values)
cur.close()
CONN.commit()
except:
print(traceback.format_exc())
if __name__ == "__main__":
args = parser.parse_args()
CONN = sqlite3.connect(args.db_name)
try:
regexes = json.load(open(args.regex))
REGEXS = {k: re.compile(v.encode('utf8'), re.IGNORECASE) for k, v in regexes.items()}
sorted_columns = sorted(REGEXS.keys())
except:
pass
table_name = args.table_name
sorted_columns = sorted(REGEXS.keys())
all_columns = [NORMALIZED_HASH, FILE_HASH, FILENAME, CONTENT_TYPE, FAILED] + sorted_columns
create_table_fmt = "CREATE TABLE {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, normalized_hash TEXT, file_hash TEXT, filename TEXT, content_type TEXT, failed TEXT, {columns});"
columns = ['{} TEXT'.format(k) for k in sorted_columns]
create_stmt = create_table_fmt.format(**{'columns': ", ".join(columns), 'table_name': table_name})
try:
CONN.execute(create_stmt)
CONN.commit()
except:
print(traceback.format_exc())
insert_table_fmt = "INSERT INTO {table_name}({columns}) VALUES({values})"
insert_stmt = insert_table_fmt.format(**{
'table_name': table_name,
'columns': ", ".join(all_columns),
'values': ', '.join(['?' for _ in all_columns])
})
INSERT = insert_stmt
current_results = {}
base_dir = args.files_base
files = set([os.path.join(base_dir, i) for i in os.listdir(base_dir)])
if args.resume:
cur = CONN.cursor()
print("Resuming, checking for previously analyzed files")
results = set([i[0] for i in cur.execute('SELECT filename from {}'.format(table_name))])
print("Removing previously analyzed files")
uu = set([i for i in files if i not in results])
del files
files = uu
infos = [(i, None, [':', '.'], " ", REGEXS) for i in files]
max = len(infos)
cnt = 0
with Pool(processes=NUM_PROCS) as pool:
futures = pool.imap_unordered(analyze, infos)
for result in futures:
handle_completed_result(result, max, cnt, all_columns)
cnt += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment