import csv import re import dns.resolver import smtplib import socket import time from urllib.parse import urlparse # Configuration INPUT_CSV = 'input.csv' OUTPUT_CSV = 'verified_emails.csv' DELAY_BETWEEN_CHECKS = 1 # Seconds to avoid rate limiting # https://gist.github.com/irazasyed/02d6530b83c3ba2fe3682caaff4c0222 def extract_domain(url): """Extract domain from URL""" parsed = urlparse(url) if parsed.netloc: return parsed.netloc.replace('www.', '').split(':')[0] return None def generate_email_permutations(first_name, last_name, domain): """Generate comprehensive email patterns""" first = first_name.lower().strip() if first_name else '' last = last_name.lower().strip() if last_name else '' patterns = [ # Name-based patterns f"{first}@{domain}", f"{last}@{domain}", f"{first}{last}@{domain}", f"{first}.{last}@{domain}", f"{first}_{last}@{domain}", f"{first}-{last}@{domain}", f"{first[0]}{last}@{domain}" if first and last else '', f"{first[0]}.{last}@{domain}" if first and last else '', f"{first}{last[0]}@{domain}" if first and last else '', f"{last}{first}@{domain}" if first and last else '', f"{last}.{first}@{domain}" if first and last else '', # Initials and variants f"{first[0]}{last[0]}@{domain}" if first and last else '', f"{first[0]}_{last}@{domain}" if first and last else '', f"{first}-admin@{domain}" if first else '', f"team.{first}@{domain}" if first else '', # Common roles (e-commerce focused) f"admin@{domain}", f"billing@{domain}", f"sales@{domain}", f"support@{domain}", f"contact@{domain}", f"me@{domain}", f"hi@{domain}", f"hello@{domain}", f"ceo@{domain}", f"founder@{domain}", f"director@{domain}", f"md@{domain}", f"mangagingdirector@{domain}", f"owner@{domain}", f"manager@{domain}", f"orders@{domain}", f"cs@{domain}", f"customerservice@{domain}", # International variants f"comercial@{domain}", # Spanish f"ventas@{domain}", # Spanish f"vendas@{domain}", # Portuguese ] # Clean empty patterns and duplicates return sorted(list(set([p for p in patterns if p and validate_email_syntax(p)]))) def validate_email_syntax(email): """RFC-compliant email validation""" regex = r'^[a-z0-9]+[\._]?[a-z0-9+-]+[@]\w+[.]\w+(\.\w+)?$' return re.match(regex, email.lower()) def check_mx_records(domain): """Check if domain has valid MX records""" try: return bool(dns.resolver.resolve(domain, 'MX')) except: return False def is_catch_all_domain(domain): """Check if domain accepts all emails""" try: test_email = f"invalid{int(time.time())}@{domain}" mx_record = dns.resolver.resolve(domain, 'MX')[0].exchange.to_text() with smtplib.SMTP(mx_record, timeout=10) as server: server.helo(server.local_hostname) server.mail('verify@example.com') code, _ = server.rcpt(test_email) return code == 250 except: return False def verify_email(email, domain, is_catch_all): """Hunter-style verification""" if not validate_email_syntax(email): return 'invalid' if not check_mx_records(domain): return 'invalid' if is_catch_all: return 'catch-all' try: mx_record = dns.resolver.resolve(domain, 'MX')[0].exchange.to_text() with smtplib.SMTP(mx_record, timeout=10) as server: server.helo(server.local_hostname) server.mail('verify@example.com') code, _ = server.rcpt(email) return 'valid' if code == 250 else 'invalid' except: return 'unknown' def process_row(row): """Process single CSV row""" website = row['website'] owner = row.get('owner', '') domain = extract_domain(website) if not domain: return None # Split owner name name_parts = owner.split() first_name = name_parts[0] if name_parts else '' last_name = ' '.join(name_parts[1:]) if len(name_parts) > 1 else '' # Generate emails emails = generate_email_permutations(first_name, last_name, domain) if not emails: return None # Domain checks mx_valid = check_mx_records(domain) catch_all = is_catch_all_domain(domain) if mx_valid else False results = [] for email in emails: status = 'invalid' if mx_valid: status = verify_email(email, domain, catch_all) time.sleep(DELAY_BETWEEN_CHECKS) results.append({ 'website': website, 'domain': domain, 'owner': owner, 'email': email, 'status': status, 'catch_all': catch_all, 'mx_valid': mx_valid }) return results def main(): """Process CSV and save results""" with open(INPUT_CSV, mode='r', encoding='utf-8') as infile, \ open(OUTPUT_CSV, mode='w', encoding='utf-8', newline='') as outfile: reader = csv.DictReader(infile) writer = csv.DictWriter(outfile, fieldnames=[ 'website', 'domain', 'owner', 'email', 'status', 'catch_all', 'mx_valid' ]) writer.writeheader() for idx, row in enumerate(reader, 1): print(f"Processing {idx}: {row['website']}") try: results = process_row(row) if results: writer.writerows(results) except Exception as e: print(f"Error processing {row['website']}: {str(e)}") if __name__ == "__main__": main()