Skip to content

Instantly share code, notes, and snippets.

@Thong-Tran
Last active November 10, 2020 05:52
Show Gist options
  • Save Thong-Tran/67b54f4b23672c43660aee45a36b7f12 to your computer and use it in GitHub Desktop.
Save Thong-Tran/67b54f4b23672c43660aee45a36b7f12 to your computer and use it in GitHub Desktop.
A collection of tools

A collection of tools

#!/usr/bin/python3
import sys
if sys.version_info < (3, 6):
print("Must be using Python 3.6 or higher")
exit(1)
import logging
import re
import smtplib
import csv
import warnings
from time import sleep
from threading import Thread
from subprocess import check_output, PIPE, CalledProcessError
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from os.path import exists, splitext, dirname
from os import makedirs
from urllib.parse import quote
from typing import Callable, List, Tuple, Union
try:
import dns.resolver as dns_resolver
except ImportError:
dns_resolver = None
# Address used for SMTP MAIL FROM command
fromAddress = '[email protected]'
def check_email(addressToVerify):
# Simple Regex for syntax checking
regex = r'^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,})$'
# Email address to verify
# inputAddress = input('Please enter the emailAddress to verify:')
# addressToVerify = str(inputAddress)
# Syntax check
match = re.match(regex, addressToVerify)
if match == None:
return {
'email': addressToVerify,
'success': False,
'code': 500,
'message': 'Bad Syntax'
}
# Get domain for DNS lookup
splitAddress = addressToVerify.split('@')
domain = str(splitAddress[1])
# print('Domain:', domain)
try:
# MX record lookup
records = dns_resolver.query(domain, 'MX')
mxRecord = records[0].exchange
mxRecord = str(mxRecord)
# SMTP lib setup (use debug level for full output)
server = smtplib.SMTP()
server.set_debuglevel(0)
# SMTP Conversation
server.connect(mxRecord)
server.helo(server.local_hostname) ### server.local_hostname(Get local server hostname)
server.mail(fromAddress)
code, message = server.rcpt(addressToVerify)
server.quit()
return {
'email': addressToVerify,
'success': True if code == 250 else False,
'code': code,
'message': message.decode('utf8')\
.replace('\n', ' ').replace(',', '')
}
except Exception as e:
# logging.exception('check_email fail:')
return {
'email': addressToVerify,
'success': False,
'code': 500,
'message': str(e)
}
def verify_email(*data_source, out_file='email_status.csv', num_thread=20,
primary_key='email', sub_key='url', overwrite=False, **kw) -> Tuple[bool, str]:
'''
Check email is valid by ping to SMTP mail server.
The command can support from inline text or input file'''
if dns_resolver is None:
raise ImportError("Can't import dnspython module.\nPlease run: pip install dnspython")
data, IS_CSV, col_email, col_url = handle_input(data_source, primary_key, sub_key)
response = []
errs = []
def fun(data, break_func):
if IS_CSV:
mail = data[col_email]
else:
mail = data
res = check_email(mail)
if not res['success']:
domain = mail.split('@')[1]
if domain not in ('gmail.com', 'yahoo.com', 'outlook.com'):
rel_name = [
'info', 'customer', 'contact', 'contato', 'service',
'hi', 'hello', 'sales', 'vendas', 'sac', 'business',
'shop', 'ask', 'admin', 'marketing'
]
for i in rel_name:
res = check_email(f'{i}@{domain}')
if res['success']:
break
if not res['success']:
errs.append(0)
if IS_CSV:
response.append({'url': data[col_url], **res})
else:
response.append(res)
breaking = run_multi_thread(fun, data, errs, num_thread, 'Verifying:')
out_file = new_file(out_file, overwrite)
with open(out_file,'w') as f:
f.write(','.join(response[0].keys()))
f.write('\n')
for r in response:
f.write(','.join(str(d) for k, d in r.items()))
f.write('\n')
return breaking, out_file
# ---------------
# These features are only for Unix
def search_url(*data_source, out_file='urls.txt', loop_time=3, num_thread=2,
primary_key='name', sub_key='company', overwrite=False, **kw) -> Tuple[bool, str]:
'''
Find URL by keyword from Google search result (Power by Google).
No. default results are 30'''
check_command('lynx')
data, IS_CSV, primary_col, _ = handle_input(data_source, primary_key, sub_key)
cm_shell = (
'lynx -listonly -dump "https://www.google.com/search?q=%22{}%22&start={}" | '
'grep "https:\\/\\/[.a-z0-9_-]*\\/" -o'
)
new_data = []
for d in data:
if IS_CSV:
f_text = quote(d[primary_col])
else:
f_text = quote(d)
for i in range(loop_time):
new_data.append((f_text, i*10))
response = set()
errs = []
def fun(data, break_func):
try:
res = check_output(
cm_shell.format(*data),
shell=True
).decode('utf8')
for r in res.split('\n'):
response.add(r)
except Exception:
# logging.exception(f'Error when scan {url}:')
errs.append(', '.join(data))
if len(errs) > 10:
break_func()
sleep(0.5)
breaking = run_multi_thread(fun, new_data, errs, num_thread, 'Url searching:')
response.discard('')
out_file = new_file(out_file, overwrite)
with open(out_file,'w') as f:
f.write('\n'.join(response))
if len(errs) > 0:
save_errors(errs)
return breaking, out_file
def scan_email(*data_source, out_file='db_emails.csv', num_thread=20,
primary_key='url', sub_key='email', overwrite=False, **kw):
'''
Find email contact in HTML content of URL.
Deep search default is level 5'''
check_command('ack', 'wget')
cm_shell = (
'wget -l 5 -T10 --tries=3 -qO- {} | '
"ack '(?!\\S*(?:jpg|png|gif|bmp))[a-zA-Z0-9.-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z0-9.-]+' | "
"grep -E -o '[_a-z0-9-]+(\\.[_a-z0-9-]+)*@[a-z0-9-]+(\\.[a-z0-9-]+)*(\\.[a-z]{{2,}})' | "
'sort | uniq'
)
ignore_list = [
'.jpg','.jpeg','.png','.gif','.bmp', '.js',
'@email.com','@example.com','@domain.com',# '@mail.com',
'@business.com', '@sentry.io', '@sentry.wixpress.com',
'@mysamsung.asia', '@sentrylabs.indeed.com', '@sentry.gopomo.com',
'@address.com', '@domain.de', '@sentry.firmoo.com', 'license@',
'@example.fr', '@layout.theme.css'
]
def f(data, list_results):
for res in data:
if res:
for i in ignore_list:
if res.endswith(i):
break
else:
list_results.append(res)
return multi_scan(
'email', cm_shell, data_source, f, out_file,
num_thread, primary_key, sub_key, 'Scan email:',
overwrite
)
def scan_facebook(*data_source, out_file='db_facebook.csv', num_thread=20,
primary_key='url', sub_key='email', overwrite=False, **kw):
'''Find facebook group URL in HTML content of URL'''
check_command('ack', 'wget')
cm_shell = (
'wget -l 5 -T10 --tries=3 -qO- {} | '
"ack -o '(?:(?:http|https):\\/\\/)?(?:www.)?facebook.com\\/[a-zA-Z0-9.\\-\\/]*' | "
'sort | uniq'
)
def f(data, list_results):
for res in data:
if res:
list_results.append(res)
return multi_scan(
'facebook', cm_shell, data_source, f, out_file,
num_thread, primary_key, sub_key, 'Scan facebook:',
overwrite
)
def search_email(*data_source, out_file='google-emails.csv', num_thread=2,
primary_key='url', sub_key='email', overwrite=False, **kw):
'''
Search email of the keyword via Google search. (Power by Google)
Such as: Search email in the results via the keyword "Shopee email"'''
check_command('lynx')
def f(data, list_results):
# hostname = urlparse(url).hostname.replace('www.','')
res = []
i = 0
start = 0
content = []
while i < len(data):
if data[i] is '':
if start != 0:
content.append(' '.join(t.lstrip() for t in data[start:i]))
content.append(''.join(t.lstrip() for t in data[start:i]))
i += 1
start = i
i += 1
for t in content:
r = re.search(
r'[_a-z0-9-]+(\.[_a-z0-9-]+)*@'
r'[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,})', t
)
if r:
r = r.group()
if r.endswith('.https'):
r = r[:-6]
elif r.endswith('https'):
r = r[:-5]
elif r.endswith('.'):
r = r[:-1]
if all(i not in r for i in res):
res.append(r)
# if hostname in r:
# list_results.append(r)
list_results.extend(res)
sleep(0.5)
cm_shell = (
'lynx -nolist -width=100 -dump -cookie_save_file=./google-emails.cookie '
'"https://www.google.com/search?q={}+email&start=0"'
)
return multi_scan(
'email', cm_shell, data_source, f, out_file,
num_thread, primary_key, sub_key, 'Google email:',
overwrite
)
# ---------------
# Common function
def check_command(*args: str):
for i in args:
try:
check_output(f'{i} --help', shell=True)
except CalledProcessError:
raise RuntimeError(
f"Can't find command {i}\n"
f"Make sure you have all installed: {', '.join(args)}"
)
def new_file(file: str, overwrite=False) -> str:
if not overwrite:
directory = dirname(file)
if directory and not exists(directory):
makedirs(directory)
i = 1
name, ext = splitext(file)
while exists(file):
file = f'{name}-({i}){ext}'
i+=1
return file
def save_errors(errs: List[str]):
file_e = new_file('error-list.txt')
print(f'Have {len(errs)} errors.\nIt will save in {file_e}')
with open(file_e, 'w') as f:
f.write('\n'.join(errs))
def handle_input(data_source, primary_key='email', sub_key='url'):
IS_CSV = False
primary_col = sub_col = 0
data: Union[List[str], List[List[str]]]
if len(data_source) == 1 and exists(data_source[0]):
IS_CSV = data_source[0].endswith('.csv')
print(f'Read data from: {data_source[0]}')
with open(data_source[0], 'r') as f:
if IS_CSV:
print(f'With primary key: {primary_key}')
csv_r = csv.reader(f)
title = next(csv_r)
try:
primary_col = title.index(primary_key)
sub_col = title.index(sub_key)
except ValueError as e:
k = str(e).replace(' is not in list', '')
raise KeyError(f"Can't find title {k} in .csv file")
data = [row for row in csv_r]
else:
data = [i.strip() for i in f.readlines()]
else:
data = data_source
return data, IS_CSV, primary_col, sub_col
def multi_scan(title: str,cm_shell: str, data_source: str,
data_handler: Callable, out_file: str='data.csv', num_thread=20,
primary_key='url', sub_key='email', prefix='Finding:', overwrite=False):
data, IS_CSV, primary_col, sub_col = handle_input(data_source, primary_key, sub_key)
if IS_CSV:
data = [d[primary_col] for d in data]
results = {}
err_urls = []
def fun(url, break_func):
try:
list_results = []
output = check_output([cm_shell.format(url)], shell=True)
data_handler(output.decode('utf8').split('\n'), list_results)
results[url] = list_results
except Exception:
# logging.exception(f'Error when scan {url}:')
err_urls.append(url)
if len(err_urls) > 15:
break_func()
breaking = run_multi_thread(fun, data, err_urls, num_thread, prefix)
out_file = new_file(out_file, overwrite)
with open(out_file,'w') as db:
db.write(f'{primary_key},{title}\n')
for url, result in results.items():
for res in result:
db.write(f'{url},{res}\n')
if len(err_urls) > 0:
save_errors(err_urls)
# else:
# print('All done')
return breaking, out_file
def run_multi_thread(fun: Callable[[object, Callable], None], data: List, err_list: List, num_thread=20, prefix=''):
thr = []
num = len(data)
i = 0
break_now = False
def f():
nonlocal break_now
break_now = True
completed = 0
try:
while True:
if i < num and num_thread > len(thr) and not break_now:
t = Thread(target=fun, args=(data[i], f))
t.start()
thr.append(t)
i+=1
elif (i == num or break_now) and len(thr) == 0:
print_progress(completed, num, prefix=prefix,
suffix=f'{completed}/{num} errors: {len(err_list)}')
break
elif not all(t.is_alive() for t in thr):
thr_dead = tuple(t for t in thr if not t.is_alive())
for t in thr_dead:
thr.remove(t)
completed += 1
del thr_dead
else:
print_progress(completed, num, prefix=prefix,
suffix=f'{completed}/{num} errors: {len(err_list)}')
sleep(.1)
except KeyboardInterrupt:
print('\nPlease wait to save current data ...')
while any(t.is_alive() for t in thr):
sleep(.1)
return True
if break_now:
print('\nToo many error, this program will be suspended')
return break_now
def print_progress(iteration: int, total: int, prefix='', suffix='', decimals=1, bar_length=40, fill='█'):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
bar_length - Optional : character length of bar (Int)
"""
str_format = "{0:." + str(decimals) + "f}"
percents = str_format.format(100 * (iteration / float(total)))
filled_length = int(round(bar_length * iteration / float(total)))
bar = fill * filled_length + '-' * (bar_length - filled_length)
sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar, percents, '%', suffix)),
if iteration == total:
sys.stdout.write('\n')
sys.stdout.flush()
def main(argv: List[str]):
parser = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
description=(
'A scan of tools\n\n'
'You can run multi actions at same time.\n'
'For example, this command will search url by keyword "glass", \n'
'then scan email, facebook in result file "urls.txt" \n'
'and verify email from "db_emails.csv" of scan-email results\n\n'
'python3 ./scan-bot.py search-url -d glass scan-email '
'scan-facebook search-email verify-email -d db_emails.csv'
),
)
subparsers = parser.add_subparsers(
title="actions",
description=(
'enter one of these actions'
)
)
list_func = (search_url, scan_email, scan_facebook, search_email, verify_email)
name_parser = [f.__qualname__.replace('_', '-') for f in list_func]
sub_parser = {}
for i, func in enumerate(list_func):
sub_parser[name_parser[i]] = subparsers.add_parser(
name_parser[i], parents=[parser],
formatter_class=RawDescriptionHelpFormatter,
add_help=False,
description=list_func[i].__doc__
)
sub_parser[name_parser[i]].add_argument(
'-d', "--data-source",
metavar='S', nargs='+', type=str, required=True,
help="input data (support read lines text file and .csv file)"
)
sub_parser[name_parser[i]].add_argument(
'-t', "--num-thread",
metavar='I', type=int,
help=("number of threads running at the same time"
f" (default: {func.__kwdefaults__['num_thread']})"
)
)
sub_parser[name_parser[i]].add_argument(
'-p', "--primary-key",
metavar='S', type=str,
help=("primary column to handle (only for .csv file)"
f" (default: {func.__kwdefaults__['primary_key']})"
)
)
sub_parser[name_parser[i]].add_argument(
'-s', "--sub-key",
metavar='S', type=str,
help=("sub column to be attached to output (only for .csv file)"
f" (default: {func.__kwdefaults__['sub_key']})"
)
)
sub_parser[name_parser[i]].add_argument(
'-o', "--out-file",
type=str, metavar='S',
# default= func.__kwdefaults__['out_file'],
help=f"set output file (default: {func.__kwdefaults__['out_file']})"
)
sub_parser[name_parser[i]].add_argument(
'-O', "--overwrite",
action='store_true',
help="overwrite output file"
)
sub_parser[name_parser[i]].set_defaults(
func=func,
**func.__kwdefaults__
)
sub_parser['search-url'].add_argument(
'-l', "--loop-time",
type=int, metavar='I',
help=f"set loop time of searching (default: {search_url.__kwdefaults__['loop_time']})"
)
s = None
args = None
breaking = False
cache = {}
for i in range(len(argv)):
if i == len(argv) - 1:
if s is None:
s = 1
args = argv[s:]
elif argv[i+1] in name_parser:
if s:
args = argv[s:i+1]
s = i+1
else:
continue
if breaking:
break
elif args:
# print(args)
if cache.get('search-url') and \
args[0] in ('scan-email','scan-facebook','search-email') and \
all(t not in args for t in ('-d', '--data-source')):
args.insert(1, cache['search-url'])
args.insert(1, '-d')
kwargs = vars(parser.parse_args(args))
func = kwargs.pop('func')
breaking, out_file = func(*kwargs.pop('data_source'), **kwargs)
cache[args[0]] = out_file
args = None
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment