-
-
Save thillsd/8c7c794934779c3406f4590542e70054 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
""" | |
$ pip install piper-tts loguru typed-argparse bs4 ebooklib mutagen | |
$ apt install ffmpeg | |
Assuming both piper and ffmpeg are in your path, this script will convert an epub file to an audiobook. | |
""" | |
import math | |
import os | |
import queue | |
import re | |
import subprocess | |
import sys | |
import threading | |
from dataclasses import dataclass, field | |
from multiprocessing import cpu_count | |
from typing import List, Tuple | |
import ebooklib | |
import typed_argparse as tap | |
from bs4 import BeautifulSoup | |
from ebooklib import epub | |
from loguru import logger | |
from mutagen.easyid3 import EasyID3 | |
logger.remove(0) | |
logger.add( | |
sys.stderr, | |
format="<green>{time:HH:mm:ss}</green>\t{thread.name}\t{message}", | |
level="INFO", | |
) | |
@dataclass | |
class RecordingJob: | |
text: str | |
output_file_prefix: str | |
title: str | |
author: str | |
book_title: str | |
chapter_number: int | |
voice: str | |
speed: str | |
pause: str | |
wav_filename: str = field(init=False) | |
mp3_filename: str = field(init=False) | |
def __post_init__(self): | |
self.wav_filename = self.output_file_prefix + ".wav" | |
self.mp3_filename = self.output_file_prefix + ".mp3" | |
def sanitize_title(title: str) -> str: | |
sanitized_title = re.sub(r"[^\w\s]", "", title, flags=re.UNICODE) | |
sanitized_title = re.sub(r"\s+", "_", sanitized_title.strip()) | |
return sanitized_title | |
def extract_chapters(epub_book: epub.EpubBook) -> List[Tuple[str, str]]: | |
chapters = [] | |
for item in epub_book.get_items(): | |
if item.get_type() == ebooklib.ITEM_DOCUMENT: | |
content = item.get_content() | |
soup = BeautifulSoup(content, features="lxml") | |
title = soup.title.string if soup.title else "" | |
raw = soup.get_text(strip=False) | |
logger.debug(f"Raw text: <{raw[:]}>") | |
# Replace excessive whitespaces and newline characters based on the mode | |
cleaned_text = re.sub(r"\s+", " ", raw.strip()) | |
logger.info(f"Cleaned text step 1: <{cleaned_text[:100]}>") | |
# fill in the title if it's missing | |
if not title: | |
title = cleaned_text[:60] | |
logger.debug(f"Raw title: <{title}>") | |
title = sanitize_title(title) | |
logger.info(f"Sanitized title: <{title}>") | |
chapters.append((title, cleaned_text)) | |
soup.decompose() | |
return chapters | |
def epub_to_audiobook( | |
input_file: str, | |
output_folder: str, | |
voice: str, | |
speed: str, | |
pause: str, | |
chapter_start: int, | |
chapter_end: int, | |
) -> None: | |
book = epub.read_epub(input_file) | |
chapters = extract_chapters(book) | |
os.makedirs(output_folder, exist_ok=True) | |
# Get the book title and author from metadata or use fallback values | |
book_title = "Untitled" | |
author = "Unknown" | |
if book.get_metadata("DC", "title"): | |
book_title = book.get_metadata("DC", "title")[0][0] | |
if book.get_metadata("DC", "creator"): | |
author = book.get_metadata("DC", "creator")[0][0] | |
# Filter out empty or very short chapters | |
chapters = [(title, text) for title, text in chapters if text.strip()] | |
logger.info(f"Chapters count: {len(chapters)}.") | |
# Check chapter start and end args | |
if chapter_start < 1 or chapter_start > len(chapters): | |
raise ValueError( | |
f"Chapter start index {chapter_start} is out of range. Check your input." | |
) | |
if chapter_end < -1 or chapter_end > len(chapters): | |
raise ValueError( | |
f"Chapter end index {chapter_end} is out of range. Check your input." | |
) | |
if chapter_end == -1: | |
chapter_end = len(chapters) | |
if chapter_start > chapter_end: | |
raise ValueError( | |
f"Chapter start index {chapter_start} is larger than chapter end index {chapter_end}. Check your input." | |
) | |
logger.info(f"Converting chapters {chapter_start} to {chapter_end}.") | |
# Calculate the number of digits needed for zero padding the file name | |
max_digits = int(math.log10(len(chapters))) + 1 | |
tts_queue = queue.Queue() | |
for idx, (title, text) in enumerate(chapters, start=1): | |
if idx < chapter_start: | |
continue | |
if idx > chapter_end: | |
break | |
padded_chap_number = str(idx).zfill(max_digits) | |
file_name = f"{padded_chap_number}_{title}" | |
tts_queue.put( | |
RecordingJob( | |
title=title, | |
text=text, | |
author=author, | |
output_file_prefix=os.path.join(output_folder, file_name), | |
book_title=book_title, | |
chapter_number=idx, | |
voice=voice, | |
speed=speed, | |
pause=pause, | |
) | |
) | |
pool = [ | |
threading.Thread( | |
target=worker, args=(tts_queue,), daemon=True, name=f"worker-{i}" | |
) | |
for i in range(0, cpu_count()) | |
] | |
for thread in pool: | |
thread.start() | |
tts_queue.join() | |
def worker(tts_queue: queue.Queue) -> None: | |
while True: | |
try: | |
job: RecordingJob = tts_queue.get(block=False) | |
except queue.Empty: | |
return | |
try: | |
convert_chapter(job) | |
except Exception as e: | |
logger.error( | |
f"Failed to convert chapter {job.chapter_number} to speech. Error: {e}" | |
) | |
else: | |
logger.info( | |
f"Converted chapter {job.chapter_number} to file {job.output_file_prefix}.mp3." | |
) | |
finally: | |
tts_queue.task_done() | |
def convert_chapter(job: RecordingJob) -> None: | |
logger.info(f"Converting chapter {job.chapter_number} to wav.") | |
subprocess.run( | |
[ | |
"piper", | |
"--output_file", | |
job.wav_filename, | |
"--model", | |
job.voice, | |
"--length-scale", | |
job.speed, | |
"--sentence-silence", | |
job.pause, | |
], | |
input=job.text.encode("utf-8"), | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
logger.info(f"Converting chapter {job.chapter_number} to mp3.") | |
subprocess.run( | |
[ | |
"ffmpeg", | |
"-i", | |
job.wav_filename, | |
"-codec:a", | |
"libmp3lame", | |
"-b:a", | |
"64k", | |
job.mp3_filename, | |
], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
os.remove(job.wav_filename) | |
tag = EasyID3(job.mp3_filename) | |
tag["artist"] = job.author | |
tag["title"] = job.title | |
tag["album"] = job.book_title | |
tag["tracknumber"] = str(job.chapter_number) | |
tag.save(v2_version=3) | |
class Args(tap.TypedArgs): | |
epub: str = tap.arg( | |
positional=True, | |
help="Epub file", | |
) | |
audiobook_folder: str = tap.arg( | |
positional=True, | |
help="Destination folder for the mp3 files", | |
) | |
start: int = tap.arg( | |
default=1, | |
help="chapter to start from", | |
) | |
end: int = tap.arg( | |
default=-1, | |
help="chapter to finish at", | |
) | |
speed: str = tap.arg( | |
default="0.75", | |
help="speed of the generated audio", | |
) | |
voice: str = tap.arg( | |
default="en_US-joe-medium", | |
help="voice to use for the generated audio. To see valid options, see the docs for piper", | |
) | |
pause: str = tap.arg( | |
default="0.5", | |
help="length of pauses between sentences", | |
) | |
def main(args: Args): | |
epub_to_audiobook( | |
input_file=args.epub, | |
output_folder=args.audiobook_folder, | |
chapter_start=args.start, | |
chapter_end=args.end, | |
voice=args.voice, | |
speed=args.speed, | |
pause=args.pause, | |
) | |
logger.info("Done! 👍") | |
if __name__ == "__main__": | |
try: | |
tap.Parser(Args).bind(main).run() | |
except KeyboardInterrupt: | |
sys.exit(1) |
Used chatgpt, to add another command line argument -t,--threads, so you can choose the threads used. Extra time doesn't bother me, as I have it run while I'm asleep. Computer went from feeling like it was going to lock up. To acceptably usable and not likely to freeze and need a reboot. Here's the fork for anyone interested: https://gist.github.com/bonelifer/498bcf3b47a1a9c8881074602104d98c
If you check the version in the git repo, it defaults to only two threads. This version is much buggier.
I can see your issue in that repo. I'll look at it when I have time.
This version worked for me, without the problems the repo version had. Even with it maxing out my cores, the mp3's actually completed and worked. Thanks for replying. Look forward to new commits :)
Is there a way to use another voice like en_US-libritts_r-medium? I have the onnx files locally. It's supported on their site.