Skip to content

Instantly share code, notes, and snippets.

@ttunguz
Created June 27, 2025 17:02
Show Gist options
  • Save ttunguz/fb5f5812a9eab0e8fa7255c1213d0075 to your computer and use it in GitHub Desktop.
Save ttunguz/fb5f5812a9eab0e8fa7255c1213d0075 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
FAST Single-Model Email Auto Reply Script
This ultra-optimized script:
1. Uses a single model for ALL tasks (eliminates model switching overhead)
2. Combines multiple steps into single prompts where possible
3. Streamlines processing for minimal latency
4. Maintains quality while maximizing speed
Usage: python3 auto_reply_single_fast.py [--guidance "your guidance here"] [--intro] [--ultra-fast]
"""
import os
import sys
# Suppress HuggingFace tokenizers parallelism warning
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import email
import email.utils
import email.header
import re
import argparse
import socket
import json
import time
import requests
# --- Configuration ---
YOUR_EMAIL = "[email protected]"
YOUR_NAME = "Your Name"
UNIFIED_MODEL = "gemma3:12b" # Or any other model you prefer
MAX_HISTORY_WORDS = 40000
SOCKET_PATH = "/tmp/lancedb_daemon.sock"
def decode_header(header):
"""Decode email header correctly."""
decoded_header = email.header.decode_header(header)
header_parts = []
for part, encoding in decoded_header:
if isinstance(part, bytes):
if encoding:
try:
part = part.decode(encoding)
except UnicodeDecodeError:
part = part.decode('utf-8', errors='replace')
else:
part = part.decode('utf-8', errors='replace')
header_parts.append(str(part))
return ' '.join(header_parts)
def extract_name_email(from_header):
"""Extract name and email from From header."""
from_header = decode_header(from_header)
try:
name, email_addr = email.utils.parseaddr(from_header)
if not email_addr and '@' in name:
match = re.search(r'<([^>]+)>', name)
if match:
email_addr = match.group(1)
name_part = name.replace(f"<{email_addr}>", "").strip()
if name_part:
name = name_part
elif email_addr and not name_part:
name = email_addr.split('@')[0]
elif '@' in name and ' ' not in name:
email_addr = name
name = email_addr.split('@')[0]
return name, email_addr
except:
return from_header, ""
def get_similar_emails_quick(query_text: str, debug: bool = False):
"""
ULTRA-FAST: Try to get context but fail fast if daemon not ready
"""
if not query_text:
return ""
if not os.path.exists(SOCKET_PATH):
return "" # No daemon, skip context
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(2) # Very short timeout
sock.connect(SOCKET_PATH)
request = json.dumps({"query": query_text, "limit": 1}) # Only 1 result
message = request.encode('utf-8')
sock.sendall(len(message).to_bytes(4, 'big') + message)
response_data = b""
start_time = time.time()
while time.time() - start_time < 2:
chunk = sock.recv(4096)
if not chunk:
break
response_data += chunk
try:
response_str = response_data.decode('utf-8')
response = json.loads(response_str)
break
except (json.JSONDecodeError, UnicodeDecodeError):
continue
sock.close()
if 'response' not in locals() or response.get('status') != 'success':
return ""
emails = response.get('emails', [])
if not emails:
return ""
# Just one example for context
email = emails[0]
return f"Similar: {email['sender']}: {email['text'][:150]}..."
except Exception:
return ""
def quick_email_classification(subject, body):
"""Quick heuristic-based classification without LLM"""
if not body:
return 'normal'
# Quick intro detection
intro_signals = ['introduce', 'meet', 'connect', 'founder', 'pleasure to']
if any(signal in body.lower() for signal in intro_signals):
return 'intro'
# Quick list detection
list_signals = ['first', 'second', 'third', '1.', '2.', '3.', 'questions:', 'points:']
if any(signal in body.lower() for signal in list_signals):
return 'list'
return 'normal'
def parse_email_from_stdin():
"""Fast email parsing with minimal processing."""
stdin_content = sys.stdin.read()
# Quick check for plain text
if not stdin_content.strip().startswith(('From:', 'To:', 'Subject:', 'Date:', 'Return-Path:')):
return {
'subject': '(No subject)',
'body': stdin_content.strip(),
'sender_name': '',
'sender_email': '',
'email_type': quick_email_classification('', stdin_content)
}
# Parse email
from io import StringIO
msg = email.message_from_file(StringIO(stdin_content))
subject = decode_header(msg.get('Subject', '(No subject)'))
from_header = msg.get('From', '')
sender_name, sender_email = extract_name_email(from_header)
# Simplified body extraction
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
try:
charset = part.get_content_charset() or 'utf-8'
payload = part.get_payload(decode=True)
body = payload.decode(charset, errors='replace')
break
except:
continue
else:
if msg.get_content_type() == 'text/plain':
try:
charset = msg.get_content_charset() or 'utf-8'
payload = msg.get_payload(decode=True)
body = payload.decode(charset, errors='replace')
except:
body = ""
return {
'subject': subject,
'body': body,
'sender_name': sender_name,
'sender_email': sender_email,
'email_type': quick_email_classification(subject, body)
}
def add_smart_paragraph_breaks(text):
"""Add paragraph breaks to long text that doesn't have them"""
if not text or '\n\n' in text:
return text # Already has paragraph breaks or too short
# Count sentences (rough estimate)
sentence_endings = text.count('.') + text.count('!') + text.count('?')
# If 3+ sentences and no paragraph breaks, try to add one intelligently
if sentence_endings >= 3 and len(text) > 100:
# Look for natural break points
sentences = re.split(r'([.!?])\s+', text)
if len(sentences) >= 5: # Has multiple sentences
# Find middle point and add break after a sentence
mid_point = len(sentences) // 2
# Find nearest sentence end from middle
for i in range(mid_point, min(mid_point + 3, len(sentences))):
if i < len(sentences) - 1 and sentences[i] in '.!?':
# Insert paragraph break
sentences[i] = sentences[i] + '\n'
break
text = ''.join(sentences)
return text
def generate_unified_reply(email_obj, guidance="", intro_mode=False, ultra_fast=False, debug=False):
"""
UNIFIED GENERATION: Single model call handles everything
- Reply generation
- List formatting
- Paragraph breaks
- Style polishing
All in one pass!
"""
# Get minimal context if available
context = ""
if not ultra_fast and email_obj.get('body'):
similar = get_similar_emails_quick(email_obj['body'], debug)
if similar:
context = f"Context: {similar}\n\n"
# Build the unified prompt based on task
if guidance:
# Polish guidance
task_prompt = f"""You are {YOUR_NAME}'s writing assistant. Polish this draft reply:
{context}DRAFT: {guidance}
Instructions:
1. Fix spelling/grammar but keep the casual tone
2. Keep it brief but readable - break into 2-3 short paragraphs if needed
3. Use contractions like "I've", "I'll", "can't"
4. Use simple words: "sounds good", "great", "thanks"
5. Add paragraph breaks between different thoughts/topics
6. NO formal endings like "Best regards"
Polished reply:"""
else:
# Generate reply from scratch with smart formatting
email_type = email_obj.get('email_type', 'normal')
if email_type == 'list':
format_instruction = "If you mention multiple points, use a numbered list format."
else:
format_instruction = "Keep it conversational and flowing."
task_prompt = f"""You are {YOUR_NAME}'s email assistant. Write a brief, casual reply.
{context}EMAIL: {email_obj['body']}
Instructions:
1. Very casual tone - like texting a smart friend
2. Brief: usually 1-3 sentences, max 2 short paragraphs
3. Use contractions: "I've", "I'll", "that's", "can't"
4. Simple words: "sounds good", "great", "thanks", "got it"
5. {format_instruction}
6. NO formal closings (no "Best", "Regards", etc.)
7. Add paragraph breaks if reply is longer than 2 sentences
Reply:"""
# SINGLE MODEL CALL - handles everything
try:
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": UNIFIED_MODEL,
"prompt": task_prompt,
"stream": False
},
timeout=25
)
response.raise_for_status()
reply_text = response.json().get("response", "").strip()
# Minimal cleanup
reply_text = reply_text.replace("Polished reply:", "").replace("Reply:", "").strip()
# Remove any formal endings that slipped through
lines = reply_text.split('\n')
if lines and any(ending in lines[-1].lower() for ending in ['best', 'regards', 'sincerely', 'thanks']):
if len(lines[-1]) < 30: # Only remove if it's a short closing line
lines = lines[:-1]
final_reply = '\n'.join(lines).strip()
# Add paragraph breaks if missing and text is long enough
final_reply = add_smart_paragraph_breaks(final_reply)
if debug:
print(f"Generated {len(final_reply)} chars using {UNIFIED_MODEL}", file=sys.stderr)
return final_reply
except requests.exceptions.RequestException as e:
if debug:
print(f"Error generating reply: {e}", file=sys.stderr)
return None
except Exception as e:
if debug:
print(f"Error during generation: {str(e)}", file=sys.stderr)
return None
def main():
parser = argparse.ArgumentParser(description='Fast single-model email reply generator')
parser.add_argument('--guidance', '-g', type=str, default='',
help='Custom guidance for how to reply to the email')
parser.add_argument('--intro', '-i', action='store_true',
help='Force using the introduction reply template')
parser.add_argument('--ultra-fast', '-u', action='store_true',
help='Skip all context lookup for maximum speed')
parser.add_argument('--debug', '-d', action='store_true',
help='Enable debug output')
args = parser.parse_args()
# If no guidance provided, prompt for it
if not args.guidance and not args.intro:
try:
args.guidance = input("Enter guidance for fast generation: ")
except (EOFError, KeyboardInterrupt):
args.guidance = ""
start_time = time.time()
# Parse email
email_obj = parse_email_from_stdin()
if args.debug:
print(f"Email type: {email_obj.get('email_type')}", file=sys.stderr)
print(f"Subject: {email_obj.get('subject')}", file=sys.stderr)
# Generate reply with unified model
reply_text = generate_unified_reply(
email_obj,
guidance=args.guidance,
intro_mode=args.intro,
ultra_fast=args.ultra_fast,
debug=args.debug
)
if reply_text:
elapsed = time.time() - start_time
# The final reply text goes to stdout (and into the editor)
print(reply_text)
# The status message goes to stderr (and should appear in the mutt status bar)
print(f"🚀 Fast AI reply generated in {elapsed:.1f}s using {UNIFIED_MODEL}", file=sys.stderr)
else:
print("Could not generate a reply. Please write your response manually.")
print("❌ AI reply generation failed.", file=sys.stderr)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment