|
import csv |
|
import re |
|
import dns.resolver |
|
import smtplib |
|
import socket |
|
import time |
|
from urllib.parse import urlparse |
|
|
|
# Configuration |
|
INPUT_CSV = 'input.csv' |
|
OUTPUT_CSV = 'verified_emails.csv' |
|
DELAY_BETWEEN_CHECKS = 1 # Seconds to avoid rate limiting |
|
|
|
# https://gist.github.com/irazasyed/02d6530b83c3ba2fe3682caaff4c0222 |
|
def extract_domain(url): |
|
"""Extract domain from URL""" |
|
parsed = urlparse(url) |
|
if parsed.netloc: |
|
return parsed.netloc.replace('www.', '').split(':')[0] |
|
return None |
|
|
|
def generate_email_permutations(first_name, last_name, domain): |
|
"""Generate comprehensive email patterns""" |
|
first = first_name.lower().strip() if first_name else '' |
|
last = last_name.lower().strip() if last_name else '' |
|
|
|
patterns = [ |
|
# Name-based patterns |
|
f"{first}@{domain}", |
|
f"{last}@{domain}", |
|
f"{first}{last}@{domain}", |
|
f"{first}.{last}@{domain}", |
|
f"{first}_{last}@{domain}", |
|
f"{first}-{last}@{domain}", |
|
f"{first[0]}{last}@{domain}" if first and last else '', |
|
f"{first[0]}.{last}@{domain}" if first and last else '', |
|
f"{first}{last[0]}@{domain}" if first and last else '', |
|
f"{last}{first}@{domain}" if first and last else '', |
|
f"{last}.{first}@{domain}" if first and last else '', |
|
|
|
# Initials and variants |
|
f"{first[0]}{last[0]}@{domain}" if first and last else '', |
|
f"{first[0]}_{last}@{domain}" if first and last else '', |
|
f"{first}-admin@{domain}" if first else '', |
|
f"team.{first}@{domain}" if first else '', |
|
|
|
# Common roles (e-commerce focused) |
|
f"admin@{domain}", |
|
f"billing@{domain}", |
|
f"sales@{domain}", |
|
f"support@{domain}", |
|
f"contact@{domain}", |
|
f"me@{domain}", |
|
f"hi@{domain}", |
|
f"hello@{domain}", |
|
f"ceo@{domain}", |
|
f"founder@{domain}", |
|
f"director@{domain}", |
|
f"md@{domain}", |
|
f"mangagingdirector@{domain}", |
|
f"owner@{domain}", |
|
f"manager@{domain}", |
|
f"orders@{domain}", |
|
f"cs@{domain}", |
|
f"customerservice@{domain}", |
|
|
|
# International variants |
|
f"comercial@{domain}", # Spanish |
|
f"ventas@{domain}", # Spanish |
|
f"vendas@{domain}", # Portuguese |
|
] |
|
|
|
# Clean empty patterns and duplicates |
|
return sorted(list(set([p for p in patterns if p and validate_email_syntax(p)]))) |
|
|
|
def validate_email_syntax(email): |
|
"""RFC-compliant email validation""" |
|
regex = r'^[a-z0-9]+[\._]?[a-z0-9+-]+[@]\w+[.]\w+(\.\w+)?$' |
|
return re.match(regex, email.lower()) |
|
|
|
def check_mx_records(domain): |
|
"""Check if domain has valid MX records""" |
|
try: |
|
return bool(dns.resolver.resolve(domain, 'MX')) |
|
except: |
|
return False |
|
|
|
def is_catch_all_domain(domain): |
|
"""Check if domain accepts all emails""" |
|
try: |
|
test_email = f"invalid{int(time.time())}@{domain}" |
|
mx_record = dns.resolver.resolve(domain, 'MX')[0].exchange.to_text() |
|
with smtplib.SMTP(mx_record, timeout=10) as server: |
|
server.helo(server.local_hostname) |
|
server.mail('[email protected]') |
|
code, _ = server.rcpt(test_email) |
|
return code == 250 |
|
except: |
|
return False |
|
|
|
def verify_email(email, domain, is_catch_all): |
|
"""Hunter-style verification""" |
|
if not validate_email_syntax(email): |
|
return 'invalid' |
|
|
|
if not check_mx_records(domain): |
|
return 'invalid' |
|
|
|
if is_catch_all: |
|
return 'catch-all' |
|
|
|
try: |
|
mx_record = dns.resolver.resolve(domain, 'MX')[0].exchange.to_text() |
|
with smtplib.SMTP(mx_record, timeout=10) as server: |
|
server.helo(server.local_hostname) |
|
server.mail('[email protected]') |
|
code, _ = server.rcpt(email) |
|
return 'valid' if code == 250 else 'invalid' |
|
except: |
|
return 'unknown' |
|
|
|
def process_row(row): |
|
"""Process single CSV row""" |
|
website = row['website'] |
|
owner = row.get('owner', '') |
|
|
|
domain = extract_domain(website) |
|
if not domain: |
|
return None |
|
|
|
# Split owner name |
|
name_parts = owner.split() |
|
first_name = name_parts[0] if name_parts else '' |
|
last_name = ' '.join(name_parts[1:]) if len(name_parts) > 1 else '' |
|
|
|
# Generate emails |
|
emails = generate_email_permutations(first_name, last_name, domain) |
|
if not emails: |
|
return None |
|
|
|
# Domain checks |
|
mx_valid = check_mx_records(domain) |
|
catch_all = is_catch_all_domain(domain) if mx_valid else False |
|
|
|
results = [] |
|
for email in emails: |
|
status = 'invalid' |
|
if mx_valid: |
|
status = verify_email(email, domain, catch_all) |
|
time.sleep(DELAY_BETWEEN_CHECKS) |
|
|
|
results.append({ |
|
'website': website, |
|
'domain': domain, |
|
'owner': owner, |
|
'email': email, |
|
'status': status, |
|
'catch_all': catch_all, |
|
'mx_valid': mx_valid |
|
}) |
|
|
|
return results |
|
|
|
def main(): |
|
"""Process CSV and save results""" |
|
with open(INPUT_CSV, mode='r', encoding='utf-8') as infile, \ |
|
open(OUTPUT_CSV, mode='w', encoding='utf-8', newline='') as outfile: |
|
|
|
reader = csv.DictReader(infile) |
|
writer = csv.DictWriter(outfile, fieldnames=[ |
|
'website', 'domain', 'owner', 'email', 'status', 'catch_all', 'mx_valid' |
|
]) |
|
writer.writeheader() |
|
|
|
for idx, row in enumerate(reader, 1): |
|
print(f"Processing {idx}: {row['website']}") |
|
try: |
|
results = process_row(row) |
|
if results: |
|
writer.writerows(results) |
|
except Exception as e: |
|
print(f"Error processing {row['website']}: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
main() |