Skip to content

Instantly share code, notes, and snippets.

@winstonma
Last active August 22, 2025 07:52
Show Gist options
  • Save winstonma/7033a86cd52bc19c6c9f0dba2b68bea0 to your computer and use it in GitHub Desktop.
Save winstonma/7033a86cd52bc19c6c9f0dba2b68bea0 to your computer and use it in GitHub Desktop.
A Python script that fetches and converts web articles into playable audio using parallel processing.
#!/usr/bin/env python3
import edge_tts
import newspaper
import argparse
import os
import subprocess
import logging
import shutil
import re
import functools
import sys
import asyncio
import urllib.parse
from pathlib import Path
from typing import Optional, Tuple
import time
import signal
import datetime
LOG_LEVEL = logging.INFO
TTS_VOICE = "en-US-EmmaMultilingualNeural"
AUDIO_PLAYER = "mpv"
MAX_RETRIES = 3
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s - %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
def check_dependencies():
dependencies = {
"newspaper": "pip install newspaper4k",
"edge_tts": "pip install edge-tts",
"ffmpeg": "system package manager or https://ffmpeg.org/download.html",
}
missing = []
try:
import newspaper
except ImportError:
missing.append(("newspaper4k", dependencies["newspaper"]))
try:
import edge_tts
except ImportError:
missing.append(("edge_tts", dependencies["edge_tts"]))
if shutil.which(AUDIO_PLAYER) is None:
missing.append((AUDIO_PLAYER, f"Install {AUDIO_PLAYER} and add to PATH"))
if shutil.which("ffmpeg") is None:
missing.append(
(
"ffmpeg",
dependencies["ffmpeg"] + " (required by mpv for demuxing)",
)
)
if missing:
logging.error("Missing dependencies:")
for dep, install_cmd in missing:
logging.error(f" {dep}: {install_cmd}")
sys.exit(1)
check_dependencies()
@functools.lru_cache(maxsize=128)
def extract_text_from_url(url: str) -> Tuple[str, Optional[str], Optional[str]]:
for attempt in range(MAX_RETRIES):
try:
article = newspaper.Article(url)
article.download()
article.parse()
if not article.text:
raise ValueError("No text extracted from URL")
title = article.title
if not title:
title = urllib.parse.urlparse(url).netloc
publish_date = None
if article.publish_date and isinstance(
article.publish_date, datetime.datetime
):
publish_date = article.publish_date.strftime("%A, %B %d, %Y")
content = article.text
content = re.sub(r"\n+", ". ", content)
content = re.sub(r"\s+", " ", content)
content = re.sub(r"\. \.", ".", content)
content = content.strip()
word_count = len(content.split())
logging.info(f"Extracted {word_count} words from URL")
return content, title, publish_date
except Exception as e:
logging.warning(
f"Text extraction failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
if attempt < MAX_RETRIES - 1:
logging.info(f"Retrying in {2**attempt} seconds...")
time.sleep(2**attempt)
else:
logging.error(f"Failed to extract text after {MAX_RETRIES} attempts.")
raise
def create_safe_filename(text: str) -> str:
safe_text = re.sub(r'[\\/:*?"<>|]', "_", text)
safe_text = safe_text.strip(" .")
return safe_text[:100].strip()
def parse_arguments():
parser = argparse.ArgumentParser(
description="Convert web articles to speech using parallel TTS processing"
)
parser.add_argument("input_url", help="URL to retrieve and convert to speech")
parser.add_argument(
"--save",
action="store_true",
help="Save audio to the user's home directory as an Opus file.",
)
return parser.parse_args()
def setup_signal_handlers():
# Fallback for Windows where asyncio loop signal handlers are not supported
if sys.platform == "win32":
def signal_handler(signum, frame):
logging.info("Received interrupt signal, initiating graceful shutdown...")
tasks = asyncio.all_tasks()
for task in tasks:
task.cancel()
try:
for sig in [signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, signal_handler)
except (ImportError, AttributeError):
logging.debug("Signal handling not available on this platform")
async def fetch_and_process_article(
url: str,
) -> Tuple[str, Optional[str], Optional[str]]:
try:
text, title, publish_date = extract_text_from_url(url)
if title:
log_message = f"Prepending article title: '{title}'"
full_text = f"{title}. "
if publish_date:
full_text += f"Was published on {publish_date}. "
log_message += f" and publish date: '{publish_date}'"
logging.info(f"{log_message} to content.")
full_text += text
else:
full_text = text
return full_text, title, publish_date
except Exception as e:
logging.error(f"Could not extract text from URL: {e}")
raise
async def run_audio_pipeline(
full_text: str,
save_audio: bool,
title: Optional[str],
tts_voice: str,
):
if not full_text.strip():
logging.error("No text to convert to speech.")
return 1
logging.info("Starting TTS generation.")
mpv_process = None
output_file_path = None
output_file = None
if save_audio:
safe_title_input = title if title else "article"
safe_title = create_safe_filename(safe_title_input)
output_file_path = Path.home() / f"{safe_title}.mp3"
logging.info(f"Saving audio to: {output_file_path}")
try:
mpv_command = [
AUDIO_PLAYER,
"-",
]
mpv_process = await asyncio.create_subprocess_exec(
*mpv_command,
stdin=subprocess.PIPE,
stdout=sys.stdout,
stderr=sys.stderr,
)
logging.info(f"Started audio player: {' '.join(mpv_command)}")
if save_audio and output_file_path:
output_file = await asyncio.to_thread(open, output_file_path, "wb")
communicate = edge_tts.Communicate(full_text, tts_voice)
async for chunk in communicate.stream():
if asyncio.current_task().cancelled():
logging.info("TTS streaming cancelled.")
break
if chunk["type"] == "audio":
mpv_process.stdin.write(chunk["data"])
if output_file is not None:
await asyncio.to_thread(output_file.write, chunk["data"])
print()
logging.info("TTS streaming finished.")
if mpv_process.stdin and not mpv_process.stdin.is_closing():
mpv_process.stdin.close()
if mpv_process.returncode is None:
await mpv_process.wait()
except FileNotFoundError:
logging.error(f"{AUDIO_PLAYER} not found. Please install it to play audio.")
return 1
except Exception as e:
logging.error(f"Unexpected error during audio streaming or playback: {e}")
return 1
finally:
if mpv_process:
# Refresh process state; if it's already exited, skip termination
if mpv_process.returncode is None:
try:
await asyncio.wait_for(mpv_process.wait(), timeout=0)
except asyncio.TimeoutError:
pass
if mpv_process.returncode is None:
try:
mpv_process.terminate()
await asyncio.wait_for(mpv_process.wait(), timeout=5)
except asyncio.TimeoutError:
logging.warning(
"Audio player did not terminate in time; killing process..."
)
mpv_process.kill()
await mpv_process.wait()
if output_file is not None:
try:
await asyncio.to_thread(output_file.close)
except Exception:
pass
if output_file_path and output_file_path.exists() and save_audio:
logging.info(f"Audio saved to {output_file_path}")
elif output_file_path and output_file_path.exists() and not save_audio:
os.remove(output_file_path)
return 0
async def main():
args = parse_arguments()
setup_signal_handlers()
try:
if sys.platform != "win32":
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(
sig,
lambda: [
t.cancel()
for t in asyncio.all_tasks()
if t is not asyncio.current_task()
],
)
except NotImplementedError:
# Some event loops may not implement this; ignore and rely on fallback
pass
except Exception:
# Be resilient if signal setup fails for any reason
pass
logging.info(f"Processing URL: {args.input_url}")
try:
full_text, title, _ = await fetch_and_process_article(args.input_url)
except Exception:
return 1
exit_code = await run_audio_pipeline(full_text, args.save, title, TTS_VOICE)
return exit_code
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except (KeyboardInterrupt, asyncio.CancelledError):
logging.info("Operation interrupted or cancelled, exiting...")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment