Created
June 27, 2025 17:02
-
-
Save ttunguz/fb5f5812a9eab0e8fa7255c1213d0075 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
FAST Single-Model Email Auto Reply Script | |
This ultra-optimized script: | |
1. Uses a single model for ALL tasks (eliminates model switching overhead) | |
2. Combines multiple steps into single prompts where possible | |
3. Streamlines processing for minimal latency | |
4. Maintains quality while maximizing speed | |
Usage: python3 auto_reply_single_fast.py [--guidance "your guidance here"] [--intro] [--ultra-fast] | |
""" | |
import os | |
import sys | |
# Suppress HuggingFace tokenizers parallelism warning | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
import email | |
import email.utils | |
import email.header | |
import re | |
import argparse | |
import socket | |
import json | |
import time | |
import requests | |
# --- Configuration --- | |
YOUR_EMAIL = "[email protected]" | |
YOUR_NAME = "Your Name" | |
UNIFIED_MODEL = "gemma3:12b" # Or any other model you prefer | |
MAX_HISTORY_WORDS = 40000 | |
SOCKET_PATH = "/tmp/lancedb_daemon.sock" | |
def decode_header(header): | |
"""Decode email header correctly.""" | |
decoded_header = email.header.decode_header(header) | |
header_parts = [] | |
for part, encoding in decoded_header: | |
if isinstance(part, bytes): | |
if encoding: | |
try: | |
part = part.decode(encoding) | |
except UnicodeDecodeError: | |
part = part.decode('utf-8', errors='replace') | |
else: | |
part = part.decode('utf-8', errors='replace') | |
header_parts.append(str(part)) | |
return ' '.join(header_parts) | |
def extract_name_email(from_header): | |
"""Extract name and email from From header.""" | |
from_header = decode_header(from_header) | |
try: | |
name, email_addr = email.utils.parseaddr(from_header) | |
if not email_addr and '@' in name: | |
match = re.search(r'<([^>]+)>', name) | |
if match: | |
email_addr = match.group(1) | |
name_part = name.replace(f"<{email_addr}>", "").strip() | |
if name_part: | |
name = name_part | |
elif email_addr and not name_part: | |
name = email_addr.split('@')[0] | |
elif '@' in name and ' ' not in name: | |
email_addr = name | |
name = email_addr.split('@')[0] | |
return name, email_addr | |
except: | |
return from_header, "" | |
def get_similar_emails_quick(query_text: str, debug: bool = False): | |
""" | |
ULTRA-FAST: Try to get context but fail fast if daemon not ready | |
""" | |
if not query_text: | |
return "" | |
if not os.path.exists(SOCKET_PATH): | |
return "" # No daemon, skip context | |
try: | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
sock.settimeout(2) # Very short timeout | |
sock.connect(SOCKET_PATH) | |
request = json.dumps({"query": query_text, "limit": 1}) # Only 1 result | |
message = request.encode('utf-8') | |
sock.sendall(len(message).to_bytes(4, 'big') + message) | |
response_data = b"" | |
start_time = time.time() | |
while time.time() - start_time < 2: | |
chunk = sock.recv(4096) | |
if not chunk: | |
break | |
response_data += chunk | |
try: | |
response_str = response_data.decode('utf-8') | |
response = json.loads(response_str) | |
break | |
except (json.JSONDecodeError, UnicodeDecodeError): | |
continue | |
sock.close() | |
if 'response' not in locals() or response.get('status') != 'success': | |
return "" | |
emails = response.get('emails', []) | |
if not emails: | |
return "" | |
# Just one example for context | |
email = emails[0] | |
return f"Similar: {email['sender']}: {email['text'][:150]}..." | |
except Exception: | |
return "" | |
def quick_email_classification(subject, body): | |
"""Quick heuristic-based classification without LLM""" | |
if not body: | |
return 'normal' | |
# Quick intro detection | |
intro_signals = ['introduce', 'meet', 'connect', 'founder', 'pleasure to'] | |
if any(signal in body.lower() for signal in intro_signals): | |
return 'intro' | |
# Quick list detection | |
list_signals = ['first', 'second', 'third', '1.', '2.', '3.', 'questions:', 'points:'] | |
if any(signal in body.lower() for signal in list_signals): | |
return 'list' | |
return 'normal' | |
def parse_email_from_stdin(): | |
"""Fast email parsing with minimal processing.""" | |
stdin_content = sys.stdin.read() | |
# Quick check for plain text | |
if not stdin_content.strip().startswith(('From:', 'To:', 'Subject:', 'Date:', 'Return-Path:')): | |
return { | |
'subject': '(No subject)', | |
'body': stdin_content.strip(), | |
'sender_name': '', | |
'sender_email': '', | |
'email_type': quick_email_classification('', stdin_content) | |
} | |
# Parse email | |
from io import StringIO | |
msg = email.message_from_file(StringIO(stdin_content)) | |
subject = decode_header(msg.get('Subject', '(No subject)')) | |
from_header = msg.get('From', '') | |
sender_name, sender_email = extract_name_email(from_header) | |
# Simplified body extraction | |
body = "" | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
if part.get_content_type() == 'text/plain': | |
try: | |
charset = part.get_content_charset() or 'utf-8' | |
payload = part.get_payload(decode=True) | |
body = payload.decode(charset, errors='replace') | |
break | |
except: | |
continue | |
else: | |
if msg.get_content_type() == 'text/plain': | |
try: | |
charset = msg.get_content_charset() or 'utf-8' | |
payload = msg.get_payload(decode=True) | |
body = payload.decode(charset, errors='replace') | |
except: | |
body = "" | |
return { | |
'subject': subject, | |
'body': body, | |
'sender_name': sender_name, | |
'sender_email': sender_email, | |
'email_type': quick_email_classification(subject, body) | |
} | |
def add_smart_paragraph_breaks(text): | |
"""Add paragraph breaks to long text that doesn't have them""" | |
if not text or '\n\n' in text: | |
return text # Already has paragraph breaks or too short | |
# Count sentences (rough estimate) | |
sentence_endings = text.count('.') + text.count('!') + text.count('?') | |
# If 3+ sentences and no paragraph breaks, try to add one intelligently | |
if sentence_endings >= 3 and len(text) > 100: | |
# Look for natural break points | |
sentences = re.split(r'([.!?])\s+', text) | |
if len(sentences) >= 5: # Has multiple sentences | |
# Find middle point and add break after a sentence | |
mid_point = len(sentences) // 2 | |
# Find nearest sentence end from middle | |
for i in range(mid_point, min(mid_point + 3, len(sentences))): | |
if i < len(sentences) - 1 and sentences[i] in '.!?': | |
# Insert paragraph break | |
sentences[i] = sentences[i] + '\n' | |
break | |
text = ''.join(sentences) | |
return text | |
def generate_unified_reply(email_obj, guidance="", intro_mode=False, ultra_fast=False, debug=False): | |
""" | |
UNIFIED GENERATION: Single model call handles everything | |
- Reply generation | |
- List formatting | |
- Paragraph breaks | |
- Style polishing | |
All in one pass! | |
""" | |
# Get minimal context if available | |
context = "" | |
if not ultra_fast and email_obj.get('body'): | |
similar = get_similar_emails_quick(email_obj['body'], debug) | |
if similar: | |
context = f"Context: {similar}\n\n" | |
# Build the unified prompt based on task | |
if guidance: | |
# Polish guidance | |
task_prompt = f"""You are {YOUR_NAME}'s writing assistant. Polish this draft reply: | |
{context}DRAFT: {guidance} | |
Instructions: | |
1. Fix spelling/grammar but keep the casual tone | |
2. Keep it brief but readable - break into 2-3 short paragraphs if needed | |
3. Use contractions like "I've", "I'll", "can't" | |
4. Use simple words: "sounds good", "great", "thanks" | |
5. Add paragraph breaks between different thoughts/topics | |
6. NO formal endings like "Best regards" | |
Polished reply:""" | |
else: | |
# Generate reply from scratch with smart formatting | |
email_type = email_obj.get('email_type', 'normal') | |
if email_type == 'list': | |
format_instruction = "If you mention multiple points, use a numbered list format." | |
else: | |
format_instruction = "Keep it conversational and flowing." | |
task_prompt = f"""You are {YOUR_NAME}'s email assistant. Write a brief, casual reply. | |
{context}EMAIL: {email_obj['body']} | |
Instructions: | |
1. Very casual tone - like texting a smart friend | |
2. Brief: usually 1-3 sentences, max 2 short paragraphs | |
3. Use contractions: "I've", "I'll", "that's", "can't" | |
4. Simple words: "sounds good", "great", "thanks", "got it" | |
5. {format_instruction} | |
6. NO formal closings (no "Best", "Regards", etc.) | |
7. Add paragraph breaks if reply is longer than 2 sentences | |
Reply:""" | |
# SINGLE MODEL CALL - handles everything | |
try: | |
response = requests.post( | |
"http://localhost:11434/api/generate", | |
json={ | |
"model": UNIFIED_MODEL, | |
"prompt": task_prompt, | |
"stream": False | |
}, | |
timeout=25 | |
) | |
response.raise_for_status() | |
reply_text = response.json().get("response", "").strip() | |
# Minimal cleanup | |
reply_text = reply_text.replace("Polished reply:", "").replace("Reply:", "").strip() | |
# Remove any formal endings that slipped through | |
lines = reply_text.split('\n') | |
if lines and any(ending in lines[-1].lower() for ending in ['best', 'regards', 'sincerely', 'thanks']): | |
if len(lines[-1]) < 30: # Only remove if it's a short closing line | |
lines = lines[:-1] | |
final_reply = '\n'.join(lines).strip() | |
# Add paragraph breaks if missing and text is long enough | |
final_reply = add_smart_paragraph_breaks(final_reply) | |
if debug: | |
print(f"Generated {len(final_reply)} chars using {UNIFIED_MODEL}", file=sys.stderr) | |
return final_reply | |
except requests.exceptions.RequestException as e: | |
if debug: | |
print(f"Error generating reply: {e}", file=sys.stderr) | |
return None | |
except Exception as e: | |
if debug: | |
print(f"Error during generation: {str(e)}", file=sys.stderr) | |
return None | |
def main(): | |
parser = argparse.ArgumentParser(description='Fast single-model email reply generator') | |
parser.add_argument('--guidance', '-g', type=str, default='', | |
help='Custom guidance for how to reply to the email') | |
parser.add_argument('--intro', '-i', action='store_true', | |
help='Force using the introduction reply template') | |
parser.add_argument('--ultra-fast', '-u', action='store_true', | |
help='Skip all context lookup for maximum speed') | |
parser.add_argument('--debug', '-d', action='store_true', | |
help='Enable debug output') | |
args = parser.parse_args() | |
# If no guidance provided, prompt for it | |
if not args.guidance and not args.intro: | |
try: | |
args.guidance = input("Enter guidance for fast generation: ") | |
except (EOFError, KeyboardInterrupt): | |
args.guidance = "" | |
start_time = time.time() | |
# Parse email | |
email_obj = parse_email_from_stdin() | |
if args.debug: | |
print(f"Email type: {email_obj.get('email_type')}", file=sys.stderr) | |
print(f"Subject: {email_obj.get('subject')}", file=sys.stderr) | |
# Generate reply with unified model | |
reply_text = generate_unified_reply( | |
email_obj, | |
guidance=args.guidance, | |
intro_mode=args.intro, | |
ultra_fast=args.ultra_fast, | |
debug=args.debug | |
) | |
if reply_text: | |
elapsed = time.time() - start_time | |
# The final reply text goes to stdout (and into the editor) | |
print(reply_text) | |
# The status message goes to stderr (and should appear in the mutt status bar) | |
print(f"🚀 Fast AI reply generated in {elapsed:.1f}s using {UNIFIED_MODEL}", file=sys.stderr) | |
else: | |
print("Could not generate a reply. Please write your response manually.") | |
print("❌ AI reply generation failed.", file=sys.stderr) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment