|
#!/usr/bin/env -S uv run --script |
|
# /// script |
|
# requires-python = ">=3.11" |
|
# dependencies = [] |
|
# /// |
|
# Cloud-hosted transcript summarization via OpenAI-compatible chat completions. |
|
# Only the transcript-summary models that graded A- or better are allowed. |
|
|
|
""" |
|
Process text chunks and generate summaries via cloud APIs. |
|
|
|
Provider selection order: |
|
1. Shell environment variables already present in the process |
|
2. `.env` at repo root as a fallback |
|
|
|
Current provider priority for the default model: |
|
1. Mistral (`MISTRAL_API_KEY`) -> `magistral-small` |
|
2. Groq (`GROQ_API_KEY`) -> `llama-3.3-70b-versatile` |
|
|
|
To add another provider later: |
|
- add its key name to `SHELL_FIRST_PROVIDER_KEYS` |
|
- add a default model for that provider |
|
- update `ensure_provider_key` and `call_model` |
|
- insert it into the priority order where you want it checked |
|
|
|
Usage: |
|
uv run scripts/process_chunks_cloud.py <folder> <input_prefix> <output_prefix> |
|
|
|
Example: |
|
uv run scripts/process_chunks_cloud.py LiveSessionsPlayback/Course1_Module6_Class transcript summary |
|
""" |
|
|
|
import argparse |
|
import json |
|
import os |
|
import re |
|
import sys |
|
import time |
|
import urllib.error |
|
import urllib.request |
|
from pathlib import Path |
|
|
|
|
|
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" |
|
MISTRAL_API_URL = "https://api.mistral.ai/v1/chat/completions" |
|
DEFAULT_MODEL = "magistral-small" |
|
DEFAULT_GROQ_MODEL = "llama-3.3-70b-versatile" |
|
|
|
MODEL_ALIASES = { |
|
# Approved Mistral option |
|
"magistral-small": "magistral-small-latest", |
|
# Approved Groq options |
|
"llama-70b": "llama-3.3-70b-versatile", |
|
"llama-4-scout": "meta-llama/llama-4-scout-17b-16e-instruct", |
|
} |
|
|
|
APPROVED_MISTRAL_MODELS = { |
|
"magistral-small-latest", |
|
} |
|
|
|
APPROVED_GROQ_MODELS = { |
|
"meta-llama/llama-4-scout-17b-16e-instruct", |
|
"llama-3.3-70b-versatile", |
|
} |
|
|
|
APPROVED_MODELS = APPROVED_MISTRAL_MODELS | APPROVED_GROQ_MODELS |
|
SHELL_FIRST_PROVIDER_KEYS = ("MISTRAL_API_KEY", "GROQ_API_KEY") |
|
|
|
|
|
def strip_ansi(text: str) -> str: |
|
"""Remove ANSI escape sequences from text.""" |
|
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") |
|
return ansi_escape.sub("", text) |
|
|
|
|
|
def read_dotenv() -> dict[str, str]: |
|
"""Read repo-root .env without overwriting existing shell environment.""" |
|
env_path = Path(__file__).resolve().parent.parent / ".env" |
|
if not env_path.exists(): |
|
return {} |
|
|
|
try: |
|
values: dict[str, str] = {} |
|
for raw in env_path.read_text(encoding="utf-8").splitlines(): |
|
line = raw.strip() |
|
if not line or line.startswith("#"): |
|
continue |
|
if "=" not in line: |
|
continue |
|
key, value = line.split("=", 1) |
|
key = key.strip() |
|
value = value.strip().strip('"').strip("'") |
|
if key: |
|
values[key] = value |
|
return values |
|
except Exception: |
|
# Don't fail hard on malformed .env; the call path will emit a useful error. |
|
return {} |
|
|
|
|
|
def select_default_model(dotenv_values: dict[str, str]) -> str: |
|
"""Choose the default model from available provider keys. |
|
|
|
Order is shell env first, then `.env`, and Mistral before Groq. |
|
""" |
|
if os.environ.get("MISTRAL_API_KEY"): |
|
return DEFAULT_MODEL |
|
if os.environ.get("GROQ_API_KEY"): |
|
return DEFAULT_GROQ_MODEL |
|
if dotenv_values.get("MISTRAL_API_KEY"): |
|
return DEFAULT_MODEL |
|
if dotenv_values.get("GROQ_API_KEY"): |
|
return DEFAULT_GROQ_MODEL |
|
raise RuntimeError( |
|
"No supported cloud API key found. Set MISTRAL_API_KEY or GROQ_API_KEY " |
|
"in the shell environment or repo-root .env." |
|
) |
|
|
|
|
|
def ensure_provider_key(model: str, dotenv_values: dict[str, str]) -> None: |
|
"""Ensure the provider key for the chosen model is available in the env.""" |
|
if model in APPROVED_MISTRAL_MODELS: |
|
if os.environ.get("MISTRAL_API_KEY"): |
|
return |
|
if dotenv_values.get("MISTRAL_API_KEY"): |
|
os.environ["MISTRAL_API_KEY"] = dotenv_values["MISTRAL_API_KEY"] |
|
return |
|
raise RuntimeError( |
|
"MISTRAL_API_KEY is required for this model. Set it in the shell or .env." |
|
) |
|
|
|
if model in APPROVED_GROQ_MODELS: |
|
if os.environ.get("GROQ_API_KEY"): |
|
return |
|
if dotenv_values.get("GROQ_API_KEY"): |
|
os.environ["GROQ_API_KEY"] = dotenv_values["GROQ_API_KEY"] |
|
return |
|
raise RuntimeError( |
|
"GROQ_API_KEY is required for this model. Set it in the shell or .env." |
|
) |
|
|
|
|
|
def strip_thinking_blocks(text: str) -> str: |
|
"""Remove Thinking blocks if present.""" |
|
lines = text.split("\n") |
|
result = [] |
|
skip_until_done = False |
|
|
|
for line in lines: |
|
if "Thinking..." in line: |
|
skip_until_done = True |
|
elif "...done thinking." in line: |
|
skip_until_done = False |
|
elif not skip_until_done: |
|
result.append(line) |
|
|
|
return "\n".join(result) |
|
|
|
|
|
def call_groq(prompt: str, model: str) -> str: |
|
api_key = os.environ.get("GROQ_API_KEY") |
|
if not api_key: |
|
raise RuntimeError("GROQ_API_KEY environment variable is not set") |
|
|
|
body = { |
|
"model": model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": prompt, |
|
} |
|
], |
|
"stream": False, |
|
} |
|
|
|
req = urllib.request.Request( |
|
GROQ_API_URL, |
|
data=json.dumps(body).encode("utf-8"), |
|
headers={ |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json", |
|
"Accept": "application/json", |
|
"User-Agent": "process_chunks_cloud/1.0", |
|
}, |
|
method="POST", |
|
) |
|
|
|
try: |
|
with urllib.request.urlopen(req, timeout=120) as resp: |
|
payload = json.loads(resp.read().decode("utf-8")) |
|
except urllib.error.HTTPError as e: |
|
detail = e.read().decode("utf-8", errors="replace") |
|
raise RuntimeError(f"Groq API error {e.code}: {detail}") from e |
|
except urllib.error.URLError as e: |
|
raise RuntimeError(f"Could not reach Groq API: {e}") from e |
|
|
|
try: |
|
return payload["choices"][0]["message"]["content"] |
|
except Exception as e: |
|
raise RuntimeError(f"Unexpected Groq API response format: {payload}") from e |
|
|
|
|
|
def call_mistral(prompt: str, model: str) -> str: |
|
api_key = os.environ.get("MISTRAL_API_KEY") |
|
if not api_key: |
|
raise RuntimeError("MISTRAL_API_KEY environment variable is not set") |
|
|
|
body = { |
|
"model": model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": prompt, |
|
} |
|
], |
|
"stream": False, |
|
} |
|
|
|
req = urllib.request.Request( |
|
MISTRAL_API_URL, |
|
data=json.dumps(body).encode("utf-8"), |
|
headers={ |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json", |
|
"Accept": "application/json", |
|
"User-Agent": "process_chunks_cloud/1.0", |
|
}, |
|
method="POST", |
|
) |
|
|
|
try: |
|
with urllib.request.urlopen(req, timeout=120) as resp: |
|
payload = json.loads(resp.read().decode("utf-8")) |
|
except urllib.error.HTTPError as e: |
|
detail = e.read().decode("utf-8", errors="replace") |
|
raise RuntimeError(f"Mistral API error {e.code}: {detail}") from e |
|
except urllib.error.URLError as e: |
|
raise RuntimeError(f"Could not reach Mistral API: {e}") from e |
|
|
|
try: |
|
content = payload["choices"][0]["message"]["content"] |
|
except Exception as e: |
|
raise RuntimeError(f"Unexpected Mistral API response format: {payload}") from e |
|
|
|
if isinstance(content, str): |
|
return content |
|
|
|
if isinstance(content, list): |
|
parts = [] |
|
for item in content: |
|
if not isinstance(item, dict): |
|
continue |
|
if item.get("type") == "text": |
|
text = item.get("text", "") |
|
if text: |
|
parts.append(text) |
|
elif item.get("type") == "thinking": |
|
thinking_parts = [] |
|
for block in item.get("thinking", []): |
|
if isinstance(block, dict) and block.get("type") == "text": |
|
text = block.get("text", "") |
|
if text: |
|
thinking_parts.append(text) |
|
if thinking_parts: |
|
parts.append("Thinking...\n" + "\n".join(thinking_parts) + "\n...done thinking.") |
|
return "\n\n".join(parts) |
|
|
|
raise RuntimeError(f"Unexpected Mistral content format: {content!r}") |
|
|
|
|
|
def resolve_model(model: str) -> str: |
|
resolved = MODEL_ALIASES.get(model, model) |
|
if resolved not in APPROVED_MODELS: |
|
allowed = ", ".join(sorted([*MODEL_ALIASES.keys(), *APPROVED_MODELS])) |
|
raise RuntimeError( |
|
"Unsupported model for transcript summarisation. " |
|
f"Allowed models: {allowed}" |
|
) |
|
return resolved |
|
|
|
|
|
def call_model(prompt: str, model: str) -> str: |
|
resolved = resolve_model(model) |
|
if resolved in APPROVED_MISTRAL_MODELS: |
|
return call_mistral(prompt, resolved) |
|
return call_groq(prompt, resolved) |
|
|
|
|
|
def process_chunk_file(chunk_file: Path, output_file: Path, thinking_file: Path, model: str, prompt_prefix: str) -> None: |
|
print(f"π Processing: {chunk_file.name} β {output_file.name}") |
|
started_at = time.perf_counter() |
|
|
|
file_size = chunk_file.stat().st_size |
|
file_lines = sum(1 for _ in open(chunk_file, "r", encoding="utf-8")) |
|
print(f" Input: {file_size} bytes ({file_lines} lines)") |
|
|
|
chunk_text = chunk_file.read_text(encoding="utf-8") |
|
prompt = f"{prompt_prefix}\n\n{chunk_text}" |
|
|
|
resolved_model = resolve_model(model) |
|
print(f"π€ Running cloud model: {resolved_model}") |
|
|
|
output = call_model(prompt, model) |
|
|
|
raw_output = output |
|
clean_output = strip_thinking_blocks(strip_ansi(raw_output)) |
|
|
|
with open(thinking_file, "w", encoding="utf-8") as f: |
|
f.write(raw_output) |
|
|
|
with open(output_file, "w", encoding="utf-8") as f: |
|
f.write(clean_output) |
|
|
|
clean_size = output_file.stat().st_size |
|
clean_lines = len(clean_output.split("\n")) |
|
elapsed = time.perf_counter() - started_at |
|
|
|
print(f"β
Completed in {elapsed:.2f} seconds") |
|
print(f" Output: {clean_size} bytes ({clean_lines} lines)") |
|
print(f" Clean: {output_file}") |
|
print(f" Raw: {thinking_file}") |
|
|
|
|
|
def main() -> None: |
|
dotenv_values = read_dotenv() |
|
|
|
parser = argparse.ArgumentParser( |
|
description="Process text chunks with cloud APIs and create clean outputs", |
|
formatter_class=argparse.RawTextHelpFormatter, |
|
) |
|
parser.add_argument("directory", help="Directory containing chunk files") |
|
parser.add_argument( |
|
"input_prefix", |
|
help='Prefix of input chunk files (e.g. "transcript" for transcript00.md)', |
|
) |
|
parser.add_argument( |
|
"output_prefix", |
|
help='Prefix for output files (e.g. "summary" creates summary_transcript00.md)', |
|
) |
|
parser.add_argument( |
|
"-m", |
|
"--model", |
|
default=None, |
|
help=( |
|
"Cloud model to use. If omitted, the script checks shell env first, then .env, " |
|
"preferring Mistral over Groq. " |
|
f"Default Mistral model: {DEFAULT_MODEL}. Default Groq model: {DEFAULT_GROQ_MODEL}. " |
|
"Allowed models only: magistral-small, magistral-small-latest, " |
|
"llama-70b, llama-3.3-70b-versatile, llama-4-scout, " |
|
"meta-llama/llama-4-scout-17b-16e-instruct" |
|
), |
|
) |
|
parser.add_argument( |
|
"-p", |
|
"--prompt", |
|
default="Summarise this file:", |
|
help='Prompt prefix for the model (default: "Summarise this file:")', |
|
) |
|
|
|
args = parser.parse_args() |
|
selected_model = args.model or select_default_model(dotenv_values) |
|
resolved_model = resolve_model(selected_model) |
|
ensure_provider_key(resolved_model, dotenv_values) |
|
|
|
folder = Path(args.directory) |
|
if not folder.is_dir(): |
|
print(f"β Error: Directory '{folder}' does not exist") |
|
sys.exit(1) |
|
|
|
print("π Starting chunk processing") |
|
print(f" Directory: {folder}") |
|
print(f" Input prefix: {args.input_prefix}") |
|
print(f" Output prefix: {args.output_prefix}") |
|
print(f" Model: {selected_model}") |
|
print(f" Resolved model: {resolved_model}") |
|
print(f" Prompt: '{args.prompt}'") |
|
print("========================================") |
|
|
|
# Find all chunk files, auto-detecting the extension |
|
chunk_files = [] |
|
for ext in (".md", ".log", ".txt"): |
|
chunk_files = sorted(list(folder.glob(f"{args.input_prefix}[0-9][0-9]{ext}"))) |
|
if chunk_files: |
|
break |
|
if not chunk_files: |
|
# Last resort: any extension |
|
chunk_files = sorted(list(folder.glob(f"{args.input_prefix}[0-9][0-9].*"))) |
|
total_files = len(chunk_files) |
|
if total_files == 0: |
|
print(f"β οΈ No chunk files found with prefix '{args.input_prefix}'") |
|
return |
|
|
|
print(f"π Found {total_files} chunk files to process") |
|
|
|
for i, chunk_file in enumerate(chunk_files, 1): |
|
print(f"\nProcessing file {i}/{total_files}") |
|
chunk_num = chunk_file.name[len(args.input_prefix) : len(args.input_prefix) + 2] |
|
|
|
output_file = folder / f"{args.output_prefix}_{args.input_prefix}{chunk_num}.md" |
|
thinking_file = folder / f"thinking_{args.output_prefix}_{args.input_prefix}{chunk_num}.md" |
|
|
|
process_chunk_file( |
|
chunk_file, |
|
output_file, |
|
thinking_file, |
|
selected_model, |
|
args.prompt, |
|
) |
|
|
|
print(f"\nπ Processing complete!") |
|
print(f" Clean files: {args.output_prefix}_{args.input_prefix}00.md to {args.output_prefix}_{args.input_prefix}{(i - 1):02d}.md") |
|
print(f" Raw files: thinking_{args.output_prefix}_{args.input_prefix}00.md to thinking_{args.output_prefix}_{args.input_prefix}{(i - 1):02d}.md") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |