Last active
April 2, 2024 20:51
-
-
Save thillsd/8c7c794934779c3406f4590542e70054 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
$ pip install piper-tts loguru typed-argparse bs4 ebooklib mutagen | |
$ apt install ffmpeg | |
Assuming both piper and ffmpeg are in your path, this script will convert an epub file to an audiobook. | |
""" | |
import math | |
import os | |
import queue | |
import re | |
import subprocess | |
import sys | |
import threading | |
from dataclasses import dataclass, field | |
from multiprocessing import cpu_count | |
from typing import List, Tuple | |
import ebooklib | |
import typed_argparse as tap | |
from bs4 import BeautifulSoup | |
from ebooklib import epub | |
from loguru import logger | |
from mutagen.easyid3 import EasyID3 | |
logger.remove(0) | |
logger.add( | |
sys.stderr, | |
format="<green>{time:HH:mm:ss}</green>\t{thread.name}\t{message}", | |
level="INFO", | |
) | |
@dataclass | |
class RecordingJob: | |
text: str | |
output_file_prefix: str | |
title: str | |
author: str | |
book_title: str | |
chapter_number: int | |
voice: str | |
speed: str | |
pause: str | |
wav_filename: str = field(init=False) | |
mp3_filename: str = field(init=False) | |
def __post_init__(self): | |
self.wav_filename = self.output_file_prefix + ".wav" | |
self.mp3_filename = self.output_file_prefix + ".mp3" | |
def sanitize_title(title: str) -> str: | |
sanitized_title = re.sub(r"[^\w\s]", "", title, flags=re.UNICODE) | |
sanitized_title = re.sub(r"\s+", "_", sanitized_title.strip()) | |
return sanitized_title | |
def extract_chapters(epub_book: epub.EpubBook) -> List[Tuple[str, str]]: | |
chapters = [] | |
for item in epub_book.get_items(): | |
if item.get_type() == ebooklib.ITEM_DOCUMENT: | |
content = item.get_content() | |
soup = BeautifulSoup(content, features="lxml") | |
title = soup.title.string if soup.title else "" | |
raw = soup.get_text(strip=False) | |
logger.debug(f"Raw text: <{raw[:]}>") | |
# Replace excessive whitespaces and newline characters based on the mode | |
cleaned_text = re.sub(r"\s+", " ", raw.strip()) | |
logger.info(f"Cleaned text step 1: <{cleaned_text[:100]}>") | |
# fill in the title if it's missing | |
if not title: | |
title = cleaned_text[:60] | |
logger.debug(f"Raw title: <{title}>") | |
title = sanitize_title(title) | |
logger.info(f"Sanitized title: <{title}>") | |
chapters.append((title, cleaned_text)) | |
soup.decompose() | |
return chapters | |
def epub_to_audiobook( | |
input_file: str, | |
output_folder: str, | |
voice: str, | |
speed: str, | |
pause: str, | |
chapter_start: int, | |
chapter_end: int, | |
) -> None: | |
book = epub.read_epub(input_file) | |
chapters = extract_chapters(book) | |
os.makedirs(output_folder, exist_ok=True) | |
# Get the book title and author from metadata or use fallback values | |
book_title = "Untitled" | |
author = "Unknown" | |
if book.get_metadata("DC", "title"): | |
book_title = book.get_metadata("DC", "title")[0][0] | |
if book.get_metadata("DC", "creator"): | |
author = book.get_metadata("DC", "creator")[0][0] | |
# Filter out empty or very short chapters | |
chapters = [(title, text) for title, text in chapters if text.strip()] | |
logger.info(f"Chapters count: {len(chapters)}.") | |
# Check chapter start and end args | |
if chapter_start < 1 or chapter_start > len(chapters): | |
raise ValueError( | |
f"Chapter start index {chapter_start} is out of range. Check your input." | |
) | |
if chapter_end < -1 or chapter_end > len(chapters): | |
raise ValueError( | |
f"Chapter end index {chapter_end} is out of range. Check your input." | |
) | |
if chapter_end == -1: | |
chapter_end = len(chapters) | |
if chapter_start > chapter_end: | |
raise ValueError( | |
f"Chapter start index {chapter_start} is larger than chapter end index {chapter_end}. Check your input." | |
) | |
logger.info(f"Converting chapters {chapter_start} to {chapter_end}.") | |
# Calculate the number of digits needed for zero padding the file name | |
max_digits = int(math.log10(len(chapters))) + 1 | |
tts_queue = queue.Queue() | |
for idx, (title, text) in enumerate(chapters, start=1): | |
if idx < chapter_start: | |
continue | |
if idx > chapter_end: | |
break | |
padded_chap_number = str(idx).zfill(max_digits) | |
file_name = f"{padded_chap_number}_{title}" | |
tts_queue.put( | |
RecordingJob( | |
title=title, | |
text=text, | |
author=author, | |
output_file_prefix=os.path.join(output_folder, file_name), | |
book_title=book_title, | |
chapter_number=idx, | |
voice=voice, | |
speed=speed, | |
pause=pause, | |
) | |
) | |
pool = [ | |
threading.Thread( | |
target=worker, args=(tts_queue,), daemon=True, name=f"worker-{i}" | |
) | |
for i in range(0, cpu_count()) | |
] | |
for thread in pool: | |
thread.start() | |
tts_queue.join() | |
def worker(tts_queue: queue.Queue) -> None: | |
while True: | |
try: | |
job: RecordingJob = tts_queue.get(block=False) | |
except queue.Empty: | |
return | |
try: | |
convert_chapter(job) | |
except Exception as e: | |
logger.error( | |
f"Failed to convert chapter {job.chapter_number} to speech. Error: {e}" | |
) | |
else: | |
logger.info( | |
f"Converted chapter {job.chapter_number} to file {job.output_file_prefix}.mp3." | |
) | |
finally: | |
tts_queue.task_done() | |
def convert_chapter(job: RecordingJob) -> None: | |
logger.info(f"Converting chapter {job.chapter_number} to wav.") | |
subprocess.run( | |
[ | |
"piper", | |
"--output_file", | |
job.wav_filename, | |
"--model", | |
job.voice, | |
"--length-scale", | |
job.speed, | |
"--sentence-silence", | |
job.pause, | |
], | |
input=job.text.encode("utf-8"), | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
logger.info(f"Converting chapter {job.chapter_number} to mp3.") | |
subprocess.run( | |
[ | |
"ffmpeg", | |
"-i", | |
job.wav_filename, | |
"-codec:a", | |
"libmp3lame", | |
"-b:a", | |
"64k", | |
job.mp3_filename, | |
], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
os.remove(job.wav_filename) | |
tag = EasyID3(job.mp3_filename) | |
tag["artist"] = job.author | |
tag["title"] = job.title | |
tag["album"] = job.book_title | |
tag["tracknumber"] = str(job.chapter_number) | |
tag.save(v2_version=3) | |
class Args(tap.TypedArgs): | |
epub: str = tap.arg( | |
positional=True, | |
help="Epub file", | |
) | |
audiobook_folder: str = tap.arg( | |
positional=True, | |
help="Destination folder for the mp3 files", | |
) | |
start: int = tap.arg( | |
default=1, | |
help="chapter to start from", | |
) | |
end: int = tap.arg( | |
default=-1, | |
help="chapter to finish at", | |
) | |
speed: str = tap.arg( | |
default="0.75", | |
help="speed of the generated audio", | |
) | |
voice: str = tap.arg( | |
default="en_US-joe-medium", | |
help="voice to use for the generated audio. To see valid options, see the docs for piper", | |
) | |
pause: str = tap.arg( | |
default="0.5", | |
help="length of pauses between sentences", | |
) | |
def main(args: Args): | |
epub_to_audiobook( | |
input_file=args.epub, | |
output_folder=args.audiobook_folder, | |
chapter_start=args.start, | |
chapter_end=args.end, | |
voice=args.voice, | |
speed=args.speed, | |
pause=args.pause, | |
) | |
logger.info("Done! 👍") | |
if __name__ == "__main__": | |
try: | |
tap.Parser(Args).bind(main).run() | |
except KeyboardInterrupt: | |
sys.exit(1) |
This version worked for me, without the problems the repo version had. Even with it maxing out my cores, the mp3's actually completed and worked. Thanks for replying. Look forward to new commits :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you check the version in the git repo, it defaults to only two threads. This version is much buggier.
I can see your issue in that repo. I'll look at it when I have time.