A collection of tools
          Last active
          November 10, 2020 05:52 
        
      - 
      
- 
        Save Thong-Tran/67b54f4b23672c43660aee45a36b7f12 to your computer and use it in GitHub Desktop. 
    A collection of tools
  
        
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/python3 | |
| import sys | |
| if sys.version_info < (3, 6): | |
| print("Must be using Python 3.6 or higher") | |
| exit(1) | |
| import logging | |
| import re | |
| import smtplib | |
| import csv | |
| import warnings | |
| from time import sleep | |
| from threading import Thread | |
| from subprocess import check_output, PIPE, CalledProcessError | |
| from argparse import ArgumentParser, RawDescriptionHelpFormatter | |
| from os.path import exists, splitext, dirname | |
| from os import makedirs | |
| from urllib.parse import quote | |
| from typing import Callable, List, Tuple, Union | |
| try: | |
| import dns.resolver as dns_resolver | |
| except ImportError: | |
| dns_resolver = None | |
| # Address used for SMTP MAIL FROM command | |
| fromAddress = '[email protected]' | |
| def check_email(addressToVerify): | |
| # Simple Regex for syntax checking | |
| regex = r'^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,})$' | |
| # Email address to verify | |
| # inputAddress = input('Please enter the emailAddress to verify:') | |
| # addressToVerify = str(inputAddress) | |
| # Syntax check | |
| match = re.match(regex, addressToVerify) | |
| if match == None: | |
| return { | |
| 'email': addressToVerify, | |
| 'success': False, | |
| 'code': 500, | |
| 'message': 'Bad Syntax' | |
| } | |
| # Get domain for DNS lookup | |
| splitAddress = addressToVerify.split('@') | |
| domain = str(splitAddress[1]) | |
| # print('Domain:', domain) | |
| try: | |
| # MX record lookup | |
| records = dns_resolver.query(domain, 'MX') | |
| mxRecord = records[0].exchange | |
| mxRecord = str(mxRecord) | |
| # SMTP lib setup (use debug level for full output) | |
| server = smtplib.SMTP() | |
| server.set_debuglevel(0) | |
| # SMTP Conversation | |
| server.connect(mxRecord) | |
| server.helo(server.local_hostname) ### server.local_hostname(Get local server hostname) | |
| server.mail(fromAddress) | |
| code, message = server.rcpt(addressToVerify) | |
| server.quit() | |
| return { | |
| 'email': addressToVerify, | |
| 'success': True if code == 250 else False, | |
| 'code': code, | |
| 'message': message.decode('utf8')\ | |
| .replace('\n', ' ').replace(',', '') | |
| } | |
| except Exception as e: | |
| # logging.exception('check_email fail:') | |
| return { | |
| 'email': addressToVerify, | |
| 'success': False, | |
| 'code': 500, | |
| 'message': str(e) | |
| } | |
| def verify_email(*data_source, out_file='email_status.csv', num_thread=20, | |
| primary_key='email', sub_key='url', overwrite=False, **kw) -> Tuple[bool, str]: | |
| ''' | |
| Check email is valid by ping to SMTP mail server. | |
| The command can support from inline text or input file''' | |
| if dns_resolver is None: | |
| raise ImportError("Can't import dnspython module.\nPlease run: pip install dnspython") | |
| data, IS_CSV, col_email, col_url = handle_input(data_source, primary_key, sub_key) | |
| response = [] | |
| errs = [] | |
| def fun(data, break_func): | |
| if IS_CSV: | |
| mail = data[col_email] | |
| else: | |
| mail = data | |
| res = check_email(mail) | |
| if not res['success']: | |
| domain = mail.split('@')[1] | |
| if domain not in ('gmail.com', 'yahoo.com', 'outlook.com'): | |
| rel_name = [ | |
| 'info', 'customer', 'contact', 'contato', 'service', | |
| 'hi', 'hello', 'sales', 'vendas', 'sac', 'business', | |
| 'shop', 'ask', 'admin', 'marketing' | |
| ] | |
| for i in rel_name: | |
| res = check_email(f'{i}@{domain}') | |
| if res['success']: | |
| break | |
| if not res['success']: | |
| errs.append(0) | |
| if IS_CSV: | |
| response.append({'url': data[col_url], **res}) | |
| else: | |
| response.append(res) | |
| breaking = run_multi_thread(fun, data, errs, num_thread, 'Verifying:') | |
| out_file = new_file(out_file, overwrite) | |
| with open(out_file,'w') as f: | |
| f.write(','.join(response[0].keys())) | |
| f.write('\n') | |
| for r in response: | |
| f.write(','.join(str(d) for k, d in r.items())) | |
| f.write('\n') | |
| return breaking, out_file | |
| # --------------- | |
| # These features are only for Unix | |
| def search_url(*data_source, out_file='urls.txt', loop_time=3, num_thread=2, | |
| primary_key='name', sub_key='company', overwrite=False, **kw) -> Tuple[bool, str]: | |
| ''' | |
| Find URL by keyword from Google search result (Power by Google). | |
| No. default results are 30''' | |
| check_command('lynx') | |
| data, IS_CSV, primary_col, _ = handle_input(data_source, primary_key, sub_key) | |
| cm_shell = ( | |
| 'lynx -listonly -dump "https://www.google.com/search?q=%22{}%22&start={}" | ' | |
| 'grep "https:\\/\\/[.a-z0-9_-]*\\/" -o' | |
| ) | |
| new_data = [] | |
| for d in data: | |
| if IS_CSV: | |
| f_text = quote(d[primary_col]) | |
| else: | |
| f_text = quote(d) | |
| for i in range(loop_time): | |
| new_data.append((f_text, i*10)) | |
| response = set() | |
| errs = [] | |
| def fun(data, break_func): | |
| try: | |
| res = check_output( | |
| cm_shell.format(*data), | |
| shell=True | |
| ).decode('utf8') | |
| for r in res.split('\n'): | |
| response.add(r) | |
| except Exception: | |
| # logging.exception(f'Error when scan {url}:') | |
| errs.append(', '.join(data)) | |
| if len(errs) > 10: | |
| break_func() | |
| sleep(0.5) | |
| breaking = run_multi_thread(fun, new_data, errs, num_thread, 'Url searching:') | |
| response.discard('') | |
| out_file = new_file(out_file, overwrite) | |
| with open(out_file,'w') as f: | |
| f.write('\n'.join(response)) | |
| if len(errs) > 0: | |
| save_errors(errs) | |
| return breaking, out_file | |
| def scan_email(*data_source, out_file='db_emails.csv', num_thread=20, | |
| primary_key='url', sub_key='email', overwrite=False, **kw): | |
| ''' | |
| Find email contact in HTML content of URL. | |
| Deep search default is level 5''' | |
| check_command('ack', 'wget') | |
| cm_shell = ( | |
| 'wget -l 5 -T10 --tries=3 -qO- {} | ' | |
| "ack '(?!\\S*(?:jpg|png|gif|bmp))[a-zA-Z0-9.-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z0-9.-]+' | " | |
| "grep -E -o '[_a-z0-9-]+(\\.[_a-z0-9-]+)*@[a-z0-9-]+(\\.[a-z0-9-]+)*(\\.[a-z]{{2,}})' | " | |
| 'sort | uniq' | |
| ) | |
| ignore_list = [ | |
| '.jpg','.jpeg','.png','.gif','.bmp', '.js', | |
| '@email.com','@example.com','@domain.com',# '@mail.com', | |
| '@business.com', '@sentry.io', '@sentry.wixpress.com', | |
| '@mysamsung.asia', '@sentrylabs.indeed.com', '@sentry.gopomo.com', | |
| '@address.com', '@domain.de', '@sentry.firmoo.com', 'license@', | |
| '@example.fr', '@layout.theme.css' | |
| ] | |
| def f(data, list_results): | |
| for res in data: | |
| if res: | |
| for i in ignore_list: | |
| if res.endswith(i): | |
| break | |
| else: | |
| list_results.append(res) | |
| return multi_scan( | |
| 'email', cm_shell, data_source, f, out_file, | |
| num_thread, primary_key, sub_key, 'Scan email:', | |
| overwrite | |
| ) | |
| def scan_facebook(*data_source, out_file='db_facebook.csv', num_thread=20, | |
| primary_key='url', sub_key='email', overwrite=False, **kw): | |
| '''Find facebook group URL in HTML content of URL''' | |
| check_command('ack', 'wget') | |
| cm_shell = ( | |
| 'wget -l 5 -T10 --tries=3 -qO- {} | ' | |
| "ack -o '(?:(?:http|https):\\/\\/)?(?:www.)?facebook.com\\/[a-zA-Z0-9.\\-\\/]*' | " | |
| 'sort | uniq' | |
| ) | |
| def f(data, list_results): | |
| for res in data: | |
| if res: | |
| list_results.append(res) | |
| return multi_scan( | |
| 'facebook', cm_shell, data_source, f, out_file, | |
| num_thread, primary_key, sub_key, 'Scan facebook:', | |
| overwrite | |
| ) | |
| def search_email(*data_source, out_file='google-emails.csv', num_thread=2, | |
| primary_key='url', sub_key='email', overwrite=False, **kw): | |
| ''' | |
| Search email of the keyword via Google search. (Power by Google) | |
| Such as: Search email in the results via the keyword "Shopee email"''' | |
| check_command('lynx') | |
| def f(data, list_results): | |
| # hostname = urlparse(url).hostname.replace('www.','') | |
| res = [] | |
| i = 0 | |
| start = 0 | |
| content = [] | |
| while i < len(data): | |
| if data[i] is '': | |
| if start != 0: | |
| content.append(' '.join(t.lstrip() for t in data[start:i])) | |
| content.append(''.join(t.lstrip() for t in data[start:i])) | |
| i += 1 | |
| start = i | |
| i += 1 | |
| for t in content: | |
| r = re.search( | |
| r'[_a-z0-9-]+(\.[_a-z0-9-]+)*@' | |
| r'[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,})', t | |
| ) | |
| if r: | |
| r = r.group() | |
| if r.endswith('.https'): | |
| r = r[:-6] | |
| elif r.endswith('https'): | |
| r = r[:-5] | |
| elif r.endswith('.'): | |
| r = r[:-1] | |
| if all(i not in r for i in res): | |
| res.append(r) | |
| # if hostname in r: | |
| # list_results.append(r) | |
| list_results.extend(res) | |
| sleep(0.5) | |
| cm_shell = ( | |
| 'lynx -nolist -width=100 -dump -cookie_save_file=./google-emails.cookie ' | |
| '"https://www.google.com/search?q={}+email&start=0"' | |
| ) | |
| return multi_scan( | |
| 'email', cm_shell, data_source, f, out_file, | |
| num_thread, primary_key, sub_key, 'Google email:', | |
| overwrite | |
| ) | |
| # --------------- | |
| # Common function | |
| def check_command(*args: str): | |
| for i in args: | |
| try: | |
| check_output(f'{i} --help', shell=True) | |
| except CalledProcessError: | |
| raise RuntimeError( | |
| f"Can't find command {i}\n" | |
| f"Make sure you have all installed: {', '.join(args)}" | |
| ) | |
| def new_file(file: str, overwrite=False) -> str: | |
| if not overwrite: | |
| directory = dirname(file) | |
| if directory and not exists(directory): | |
| makedirs(directory) | |
| i = 1 | |
| name, ext = splitext(file) | |
| while exists(file): | |
| file = f'{name}-({i}){ext}' | |
| i+=1 | |
| return file | |
| def save_errors(errs: List[str]): | |
| file_e = new_file('error-list.txt') | |
| print(f'Have {len(errs)} errors.\nIt will save in {file_e}') | |
| with open(file_e, 'w') as f: | |
| f.write('\n'.join(errs)) | |
| def handle_input(data_source, primary_key='email', sub_key='url'): | |
| IS_CSV = False | |
| primary_col = sub_col = 0 | |
| data: Union[List[str], List[List[str]]] | |
| if len(data_source) == 1 and exists(data_source[0]): | |
| IS_CSV = data_source[0].endswith('.csv') | |
| print(f'Read data from: {data_source[0]}') | |
| with open(data_source[0], 'r') as f: | |
| if IS_CSV: | |
| print(f'With primary key: {primary_key}') | |
| csv_r = csv.reader(f) | |
| title = next(csv_r) | |
| try: | |
| primary_col = title.index(primary_key) | |
| sub_col = title.index(sub_key) | |
| except ValueError as e: | |
| k = str(e).replace(' is not in list', '') | |
| raise KeyError(f"Can't find title {k} in .csv file") | |
| data = [row for row in csv_r] | |
| else: | |
| data = [i.strip() for i in f.readlines()] | |
| else: | |
| data = data_source | |
| return data, IS_CSV, primary_col, sub_col | |
| def multi_scan(title: str,cm_shell: str, data_source: str, | |
| data_handler: Callable, out_file: str='data.csv', num_thread=20, | |
| primary_key='url', sub_key='email', prefix='Finding:', overwrite=False): | |
| data, IS_CSV, primary_col, sub_col = handle_input(data_source, primary_key, sub_key) | |
| if IS_CSV: | |
| data = [d[primary_col] for d in data] | |
| results = {} | |
| err_urls = [] | |
| def fun(url, break_func): | |
| try: | |
| list_results = [] | |
| output = check_output([cm_shell.format(url)], shell=True) | |
| data_handler(output.decode('utf8').split('\n'), list_results) | |
| results[url] = list_results | |
| except Exception: | |
| # logging.exception(f'Error when scan {url}:') | |
| err_urls.append(url) | |
| if len(err_urls) > 15: | |
| break_func() | |
| breaking = run_multi_thread(fun, data, err_urls, num_thread, prefix) | |
| out_file = new_file(out_file, overwrite) | |
| with open(out_file,'w') as db: | |
| db.write(f'{primary_key},{title}\n') | |
| for url, result in results.items(): | |
| for res in result: | |
| db.write(f'{url},{res}\n') | |
| if len(err_urls) > 0: | |
| save_errors(err_urls) | |
| # else: | |
| # print('All done') | |
| return breaking, out_file | |
| def run_multi_thread(fun: Callable[[object, Callable], None], data: List, err_list: List, num_thread=20, prefix=''): | |
| thr = [] | |
| num = len(data) | |
| i = 0 | |
| break_now = False | |
| def f(): | |
| nonlocal break_now | |
| break_now = True | |
| completed = 0 | |
| try: | |
| while True: | |
| if i < num and num_thread > len(thr) and not break_now: | |
| t = Thread(target=fun, args=(data[i], f)) | |
| t.start() | |
| thr.append(t) | |
| i+=1 | |
| elif (i == num or break_now) and len(thr) == 0: | |
| print_progress(completed, num, prefix=prefix, | |
| suffix=f'{completed}/{num} errors: {len(err_list)}') | |
| break | |
| elif not all(t.is_alive() for t in thr): | |
| thr_dead = tuple(t for t in thr if not t.is_alive()) | |
| for t in thr_dead: | |
| thr.remove(t) | |
| completed += 1 | |
| del thr_dead | |
| else: | |
| print_progress(completed, num, prefix=prefix, | |
| suffix=f'{completed}/{num} errors: {len(err_list)}') | |
| sleep(.1) | |
| except KeyboardInterrupt: | |
| print('\nPlease wait to save current data ...') | |
| while any(t.is_alive() for t in thr): | |
| sleep(.1) | |
| return True | |
| if break_now: | |
| print('\nToo many error, this program will be suspended') | |
| return break_now | |
| def print_progress(iteration: int, total: int, prefix='', suffix='', decimals=1, bar_length=40, fill='█'): | |
| """ | |
| Call in a loop to create terminal progress bar | |
| @params: | |
| iteration - Required : current iteration (Int) | |
| total - Required : total iterations (Int) | |
| prefix - Optional : prefix string (Str) | |
| suffix - Optional : suffix string (Str) | |
| decimals - Optional : positive number of decimals in percent complete (Int) | |
| bar_length - Optional : character length of bar (Int) | |
| """ | |
| str_format = "{0:." + str(decimals) + "f}" | |
| percents = str_format.format(100 * (iteration / float(total))) | |
| filled_length = int(round(bar_length * iteration / float(total))) | |
| bar = fill * filled_length + '-' * (bar_length - filled_length) | |
| sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar, percents, '%', suffix)), | |
| if iteration == total: | |
| sys.stdout.write('\n') | |
| sys.stdout.flush() | |
| def main(argv: List[str]): | |
| parser = ArgumentParser( | |
| formatter_class=RawDescriptionHelpFormatter, | |
| description=( | |
| 'A scan of tools\n\n' | |
| 'You can run multi actions at same time.\n' | |
| 'For example, this command will search url by keyword "glass", \n' | |
| 'then scan email, facebook in result file "urls.txt" \n' | |
| 'and verify email from "db_emails.csv" of scan-email results\n\n' | |
| 'python3 ./scan-bot.py search-url -d glass scan-email ' | |
| 'scan-facebook search-email verify-email -d db_emails.csv' | |
| ), | |
| ) | |
| subparsers = parser.add_subparsers( | |
| title="actions", | |
| description=( | |
| 'enter one of these actions' | |
| ) | |
| ) | |
| list_func = (search_url, scan_email, scan_facebook, search_email, verify_email) | |
| name_parser = [f.__qualname__.replace('_', '-') for f in list_func] | |
| sub_parser = {} | |
| for i, func in enumerate(list_func): | |
| sub_parser[name_parser[i]] = subparsers.add_parser( | |
| name_parser[i], parents=[parser], | |
| formatter_class=RawDescriptionHelpFormatter, | |
| add_help=False, | |
| description=list_func[i].__doc__ | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-d', "--data-source", | |
| metavar='S', nargs='+', type=str, required=True, | |
| help="input data (support read lines text file and .csv file)" | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-t', "--num-thread", | |
| metavar='I', type=int, | |
| help=("number of threads running at the same time" | |
| f" (default: {func.__kwdefaults__['num_thread']})" | |
| ) | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-p', "--primary-key", | |
| metavar='S', type=str, | |
| help=("primary column to handle (only for .csv file)" | |
| f" (default: {func.__kwdefaults__['primary_key']})" | |
| ) | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-s', "--sub-key", | |
| metavar='S', type=str, | |
| help=("sub column to be attached to output (only for .csv file)" | |
| f" (default: {func.__kwdefaults__['sub_key']})" | |
| ) | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-o', "--out-file", | |
| type=str, metavar='S', | |
| # default= func.__kwdefaults__['out_file'], | |
| help=f"set output file (default: {func.__kwdefaults__['out_file']})" | |
| ) | |
| sub_parser[name_parser[i]].add_argument( | |
| '-O', "--overwrite", | |
| action='store_true', | |
| help="overwrite output file" | |
| ) | |
| sub_parser[name_parser[i]].set_defaults( | |
| func=func, | |
| **func.__kwdefaults__ | |
| ) | |
| sub_parser['search-url'].add_argument( | |
| '-l', "--loop-time", | |
| type=int, metavar='I', | |
| help=f"set loop time of searching (default: {search_url.__kwdefaults__['loop_time']})" | |
| ) | |
| s = None | |
| args = None | |
| breaking = False | |
| cache = {} | |
| for i in range(len(argv)): | |
| if i == len(argv) - 1: | |
| if s is None: | |
| s = 1 | |
| args = argv[s:] | |
| elif argv[i+1] in name_parser: | |
| if s: | |
| args = argv[s:i+1] | |
| s = i+1 | |
| else: | |
| continue | |
| if breaking: | |
| break | |
| elif args: | |
| # print(args) | |
| if cache.get('search-url') and \ | |
| args[0] in ('scan-email','scan-facebook','search-email') and \ | |
| all(t not in args for t in ('-d', '--data-source')): | |
| args.insert(1, cache['search-url']) | |
| args.insert(1, '-d') | |
| kwargs = vars(parser.parse_args(args)) | |
| func = kwargs.pop('func') | |
| breaking, out_file = func(*kwargs.pop('data_source'), **kwargs) | |
| cache[args[0]] = out_file | |
| args = None | |
| if __name__ == '__main__': | |
| main(sys.argv) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment