|
#!/usr/bin/env -S uv run --script |
|
# /// script |
|
# requires-python = ">=3.11" |
|
# dependencies = [ |
|
# "python-dotenv", |
|
# ] |
|
# /// |
|
|
|
import argparse |
|
import json |
|
import os |
|
import sys |
|
from pathlib import Path |
|
from datetime import datetime |
|
from typing import Dict, List, Any, Optional |
|
|
|
try: |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
except ImportError: |
|
pass # dotenv is optional |
|
|
|
|
|
def extract_text_from_content(content: Any) -> str: |
|
"""Extract text from various content structures.""" |
|
if isinstance(content, str): |
|
return content |
|
elif isinstance(content, list): |
|
texts = [] |
|
for item in content: |
|
if isinstance(item, dict): |
|
if 'text' in item: |
|
texts.append(item['text']) |
|
elif 'content' in item: |
|
texts.append(extract_text_from_content(item['content'])) |
|
elif isinstance(item, str): |
|
texts.append(item) |
|
return '\n'.join(texts) |
|
elif isinstance(content, dict): |
|
if 'text' in content: |
|
return content['text'] |
|
elif 'content' in content: |
|
return extract_text_from_content(content['content']) |
|
return str(content) |
|
|
|
|
|
def format_timestamp(timestamp_str: str) -> str: |
|
"""Format ISO timestamp to readable format.""" |
|
try: |
|
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) |
|
return dt.strftime('%Y-%m-%d %H:%M:%S') |
|
except: |
|
return timestamp_str |
|
|
|
|
|
def extract_messages(jsonl_file: Path, include_meta: bool = False) -> List[Dict[str, Any]]: |
|
""" |
|
Extract messages from a JSONL file. |
|
|
|
Args: |
|
jsonl_file: Path to the JSONL file |
|
include_meta: Whether to include meta messages |
|
|
|
Returns: |
|
List of extracted messages |
|
""" |
|
messages = [] |
|
|
|
try: |
|
with open(jsonl_file, 'r', encoding='utf-8') as f: |
|
for line_num, line in enumerate(f, 1): |
|
if not line.strip(): |
|
continue |
|
|
|
try: |
|
data = json.loads(line) |
|
|
|
# Skip meta messages unless requested |
|
if data.get('isMeta') and not include_meta: |
|
continue |
|
|
|
# Extract message data |
|
if 'message' in data: |
|
msg_data = data['message'] |
|
|
|
# Determine message type and role |
|
msg_type = data.get('type', 'unknown') |
|
role = msg_data.get('role', msg_type) |
|
|
|
# Extract content |
|
content = msg_data.get('content', '') |
|
text = extract_text_from_content(content) |
|
|
|
# Build message entry |
|
message = { |
|
'line': line_num, |
|
'type': msg_type, |
|
'role': role, |
|
'text': text, |
|
'timestamp': format_timestamp(data.get('timestamp', '')) |
|
} |
|
|
|
|
|
messages.append(message) |
|
|
|
except json.JSONDecodeError: |
|
pass |
|
except Exception: |
|
pass |
|
except Exception: |
|
pass |
|
|
|
return messages |
|
|
|
|
|
|
|
|
|
def log_pre_compact(input_data): |
|
"""Log pre-compact event to logs directory.""" |
|
# Ensure logs directory exists |
|
log_dir = Path("logs") |
|
log_dir.mkdir(parents=True, exist_ok=True) |
|
log_file = log_dir / 'pre_compact.json' |
|
|
|
# Read existing log data or initialize empty list |
|
if log_file.exists(): |
|
with open(log_file, 'r') as f: |
|
try: |
|
log_data = json.load(f) |
|
except (json.JSONDecodeError, ValueError): |
|
log_data = [] |
|
else: |
|
log_data = [] |
|
|
|
# Append the entire input data |
|
log_data.append(input_data) |
|
|
|
# Write back to file with formatting |
|
with open(log_file, 'w') as f: |
|
json.dump(log_data, f, indent=2) |
|
|
|
|
|
|
|
|
|
def extract_and_backup_messages(transcript_path, trigger): |
|
"""Extract messages from transcript and save them as backup before compaction.""" |
|
try: |
|
if not os.path.exists(transcript_path): |
|
return None |
|
|
|
# Extract messages from the transcript |
|
messages = extract_messages(Path(transcript_path), include_meta=False) |
|
if not messages: |
|
return None |
|
|
|
# Create backup directory for extracted messages |
|
backup_dir = Path(".dev-resources") / "context" / "messages_backup" |
|
backup_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
# Generate backup filename with timestamp and trigger type |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
session_name = Path(transcript_path).stem |
|
backup_name = f"{session_name}_messages_{trigger}_{timestamp}.json" |
|
backup_path = backup_dir / backup_name |
|
|
|
# Save extracted messages as JSON |
|
with open(backup_path, 'w', encoding='utf-8') as f: |
|
json.dump(messages, f, indent=2, ensure_ascii=False) |
|
|
|
return str(backup_path) |
|
except Exception: |
|
return None |
|
|
|
|
|
def main(): |
|
try: |
|
# Parse command line arguments |
|
parser = argparse.ArgumentParser() |
|
args = parser.parse_args() |
|
|
|
# Read JSON input from stdin |
|
input_data = json.loads(sys.stdin.read()) |
|
|
|
# Extract fields |
|
session_id = input_data.get('session_id', 'unknown') |
|
transcript_path = input_data.get('transcript_path', '') |
|
trigger = input_data.get('trigger', 'unknown') # "manual" or "auto" |
|
custom_instructions = input_data.get('custom_instructions', '') |
|
|
|
# Log the pre-compact event |
|
log_pre_compact(input_data) |
|
|
|
# Extract and backup messages |
|
backup_path = None |
|
if transcript_path: |
|
backup_path = extract_and_backup_messages(transcript_path, trigger) |
|
|
|
# Decision block: Block auto compaction |
|
reason = f"Compaction blocked by precompact hook. Use manual compaction if needed." |
|
if backup_path: |
|
reason += f" Messages backed up to: {backup_path}" |
|
|
|
result = { |
|
"decision": "block", |
|
"reason": reason |
|
} |
|
print(json.dumps(result)) |
|
sys.exit(0) |
|
|
|
except json.JSONDecodeError: |
|
# Handle JSON decode errors gracefully |
|
sys.exit(2) |
|
except Exception: |
|
# Handle any other errors gracefully |
|
sys.exit(2) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |