Last active
August 22, 2025 07:52
-
-
Save winstonma/7033a86cd52bc19c6c9f0dba2b68bea0 to your computer and use it in GitHub Desktop.
A Python script that fetches and converts web articles into playable audio using parallel processing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import edge_tts | |
import newspaper | |
import argparse | |
import os | |
import subprocess | |
import logging | |
import shutil | |
import re | |
import functools | |
import sys | |
import asyncio | |
import urllib.parse | |
from pathlib import Path | |
from typing import Optional, Tuple | |
import time | |
import signal | |
import datetime | |
LOG_LEVEL = logging.INFO | |
TTS_VOICE = "en-US-EmmaMultilingualNeural" | |
AUDIO_PLAYER = "mpv" | |
MAX_RETRIES = 3 | |
logging.basicConfig( | |
level=LOG_LEVEL, | |
format="%(asctime)s - %(levelname)s: %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
def check_dependencies(): | |
dependencies = { | |
"newspaper": "pip install newspaper4k", | |
"edge_tts": "pip install edge-tts", | |
"ffmpeg": "system package manager or https://ffmpeg.org/download.html", | |
} | |
missing = [] | |
try: | |
import newspaper | |
except ImportError: | |
missing.append(("newspaper4k", dependencies["newspaper"])) | |
try: | |
import edge_tts | |
except ImportError: | |
missing.append(("edge_tts", dependencies["edge_tts"])) | |
if shutil.which(AUDIO_PLAYER) is None: | |
missing.append((AUDIO_PLAYER, f"Install {AUDIO_PLAYER} and add to PATH")) | |
if shutil.which("ffmpeg") is None: | |
missing.append( | |
( | |
"ffmpeg", | |
dependencies["ffmpeg"] + " (required by mpv for demuxing)", | |
) | |
) | |
if missing: | |
logging.error("Missing dependencies:") | |
for dep, install_cmd in missing: | |
logging.error(f" {dep}: {install_cmd}") | |
sys.exit(1) | |
check_dependencies() | |
@functools.lru_cache(maxsize=128) | |
def extract_text_from_url(url: str) -> Tuple[str, Optional[str], Optional[str]]: | |
for attempt in range(MAX_RETRIES): | |
try: | |
article = newspaper.Article(url) | |
article.download() | |
article.parse() | |
if not article.text: | |
raise ValueError("No text extracted from URL") | |
title = article.title | |
if not title: | |
title = urllib.parse.urlparse(url).netloc | |
publish_date = None | |
if article.publish_date and isinstance( | |
article.publish_date, datetime.datetime | |
): | |
publish_date = article.publish_date.strftime("%A, %B %d, %Y") | |
content = article.text | |
content = re.sub(r"\n+", ". ", content) | |
content = re.sub(r"\s+", " ", content) | |
content = re.sub(r"\. \.", ".", content) | |
content = content.strip() | |
word_count = len(content.split()) | |
logging.info(f"Extracted {word_count} words from URL") | |
return content, title, publish_date | |
except Exception as e: | |
logging.warning( | |
f"Text extraction failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}" | |
) | |
if attempt < MAX_RETRIES - 1: | |
logging.info(f"Retrying in {2**attempt} seconds...") | |
time.sleep(2**attempt) | |
else: | |
logging.error(f"Failed to extract text after {MAX_RETRIES} attempts.") | |
raise | |
def create_safe_filename(text: str) -> str: | |
safe_text = re.sub(r'[\\/:*?"<>|]', "_", text) | |
safe_text = safe_text.strip(" .") | |
return safe_text[:100].strip() | |
def parse_arguments(): | |
parser = argparse.ArgumentParser( | |
description="Convert web articles to speech using parallel TTS processing" | |
) | |
parser.add_argument("input_url", help="URL to retrieve and convert to speech") | |
parser.add_argument( | |
"--save", | |
action="store_true", | |
help="Save audio to the user's home directory as an Opus file.", | |
) | |
return parser.parse_args() | |
def setup_signal_handlers(): | |
# Fallback for Windows where asyncio loop signal handlers are not supported | |
if sys.platform == "win32": | |
def signal_handler(signum, frame): | |
logging.info("Received interrupt signal, initiating graceful shutdown...") | |
tasks = asyncio.all_tasks() | |
for task in tasks: | |
task.cancel() | |
try: | |
for sig in [signal.SIGINT, signal.SIGTERM]: | |
signal.signal(sig, signal_handler) | |
except (ImportError, AttributeError): | |
logging.debug("Signal handling not available on this platform") | |
async def fetch_and_process_article( | |
url: str, | |
) -> Tuple[str, Optional[str], Optional[str]]: | |
try: | |
text, title, publish_date = extract_text_from_url(url) | |
if title: | |
log_message = f"Prepending article title: '{title}'" | |
full_text = f"{title}. " | |
if publish_date: | |
full_text += f"Was published on {publish_date}. " | |
log_message += f" and publish date: '{publish_date}'" | |
logging.info(f"{log_message} to content.") | |
full_text += text | |
else: | |
full_text = text | |
return full_text, title, publish_date | |
except Exception as e: | |
logging.error(f"Could not extract text from URL: {e}") | |
raise | |
async def run_audio_pipeline( | |
full_text: str, | |
save_audio: bool, | |
title: Optional[str], | |
tts_voice: str, | |
): | |
if not full_text.strip(): | |
logging.error("No text to convert to speech.") | |
return 1 | |
logging.info("Starting TTS generation.") | |
mpv_process = None | |
output_file_path = None | |
output_file = None | |
if save_audio: | |
safe_title_input = title if title else "article" | |
safe_title = create_safe_filename(safe_title_input) | |
output_file_path = Path.home() / f"{safe_title}.mp3" | |
logging.info(f"Saving audio to: {output_file_path}") | |
try: | |
mpv_command = [ | |
AUDIO_PLAYER, | |
"-", | |
] | |
mpv_process = await asyncio.create_subprocess_exec( | |
*mpv_command, | |
stdin=subprocess.PIPE, | |
stdout=sys.stdout, | |
stderr=sys.stderr, | |
) | |
logging.info(f"Started audio player: {' '.join(mpv_command)}") | |
if save_audio and output_file_path: | |
output_file = await asyncio.to_thread(open, output_file_path, "wb") | |
communicate = edge_tts.Communicate(full_text, tts_voice) | |
async for chunk in communicate.stream(): | |
if asyncio.current_task().cancelled(): | |
logging.info("TTS streaming cancelled.") | |
break | |
if chunk["type"] == "audio": | |
mpv_process.stdin.write(chunk["data"]) | |
if output_file is not None: | |
await asyncio.to_thread(output_file.write, chunk["data"]) | |
print() | |
logging.info("TTS streaming finished.") | |
if mpv_process.stdin and not mpv_process.stdin.is_closing(): | |
mpv_process.stdin.close() | |
if mpv_process.returncode is None: | |
await mpv_process.wait() | |
except FileNotFoundError: | |
logging.error(f"{AUDIO_PLAYER} not found. Please install it to play audio.") | |
return 1 | |
except Exception as e: | |
logging.error(f"Unexpected error during audio streaming or playback: {e}") | |
return 1 | |
finally: | |
if mpv_process: | |
# Refresh process state; if it's already exited, skip termination | |
if mpv_process.returncode is None: | |
try: | |
await asyncio.wait_for(mpv_process.wait(), timeout=0) | |
except asyncio.TimeoutError: | |
pass | |
if mpv_process.returncode is None: | |
try: | |
mpv_process.terminate() | |
await asyncio.wait_for(mpv_process.wait(), timeout=5) | |
except asyncio.TimeoutError: | |
logging.warning( | |
"Audio player did not terminate in time; killing process..." | |
) | |
mpv_process.kill() | |
await mpv_process.wait() | |
if output_file is not None: | |
try: | |
await asyncio.to_thread(output_file.close) | |
except Exception: | |
pass | |
if output_file_path and output_file_path.exists() and save_audio: | |
logging.info(f"Audio saved to {output_file_path}") | |
elif output_file_path and output_file_path.exists() and not save_audio: | |
os.remove(output_file_path) | |
return 0 | |
async def main(): | |
args = parse_arguments() | |
setup_signal_handlers() | |
try: | |
if sys.platform != "win32": | |
loop = asyncio.get_running_loop() | |
for sig in (signal.SIGINT, signal.SIGTERM): | |
try: | |
loop.add_signal_handler( | |
sig, | |
lambda: [ | |
t.cancel() | |
for t in asyncio.all_tasks() | |
if t is not asyncio.current_task() | |
], | |
) | |
except NotImplementedError: | |
# Some event loops may not implement this; ignore and rely on fallback | |
pass | |
except Exception: | |
# Be resilient if signal setup fails for any reason | |
pass | |
logging.info(f"Processing URL: {args.input_url}") | |
try: | |
full_text, title, _ = await fetch_and_process_article(args.input_url) | |
except Exception: | |
return 1 | |
exit_code = await run_audio_pipeline(full_text, args.save, title, TTS_VOICE) | |
return exit_code | |
if __name__ == "__main__": | |
try: | |
exit_code = asyncio.run(main()) | |
sys.exit(exit_code) | |
except (KeyboardInterrupt, asyncio.CancelledError): | |
logging.info("Operation interrupted or cancelled, exiting...") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment