Skip to content

Instantly share code, notes, and snippets.

@thillsd
Last active April 2, 2024 20:51
Show Gist options
  • Save thillsd/8c7c794934779c3406f4590542e70054 to your computer and use it in GitHub Desktop.
Save thillsd/8c7c794934779c3406f4590542e70054 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
$ pip install piper-tts loguru typed-argparse bs4 ebooklib mutagen
$ apt install ffmpeg
Assuming both piper and ffmpeg are in your path, this script will convert an epub file to an audiobook.
"""
import math
import os
import queue
import re
import subprocess
import sys
import threading
from dataclasses import dataclass, field
from multiprocessing import cpu_count
from typing import List, Tuple
import ebooklib
import typed_argparse as tap
from bs4 import BeautifulSoup
from ebooklib import epub
from loguru import logger
from mutagen.easyid3 import EasyID3
logger.remove(0)
logger.add(
sys.stderr,
format="<green>{time:HH:mm:ss}</green>\t{thread.name}\t{message}",
level="INFO",
)
@dataclass
class RecordingJob:
text: str
output_file_prefix: str
title: str
author: str
book_title: str
chapter_number: int
voice: str
speed: str
pause: str
wav_filename: str = field(init=False)
mp3_filename: str = field(init=False)
def __post_init__(self):
self.wav_filename = self.output_file_prefix + ".wav"
self.mp3_filename = self.output_file_prefix + ".mp3"
def sanitize_title(title: str) -> str:
sanitized_title = re.sub(r"[^\w\s]", "", title, flags=re.UNICODE)
sanitized_title = re.sub(r"\s+", "_", sanitized_title.strip())
return sanitized_title
def extract_chapters(epub_book: epub.EpubBook) -> List[Tuple[str, str]]:
chapters = []
for item in epub_book.get_items():
if item.get_type() == ebooklib.ITEM_DOCUMENT:
content = item.get_content()
soup = BeautifulSoup(content, features="lxml")
title = soup.title.string if soup.title else ""
raw = soup.get_text(strip=False)
logger.debug(f"Raw text: <{raw[:]}>")
# Replace excessive whitespaces and newline characters based on the mode
cleaned_text = re.sub(r"\s+", " ", raw.strip())
logger.info(f"Cleaned text step 1: <{cleaned_text[:100]}>")
# fill in the title if it's missing
if not title:
title = cleaned_text[:60]
logger.debug(f"Raw title: <{title}>")
title = sanitize_title(title)
logger.info(f"Sanitized title: <{title}>")
chapters.append((title, cleaned_text))
soup.decompose()
return chapters
def epub_to_audiobook(
input_file: str,
output_folder: str,
voice: str,
speed: str,
pause: str,
chapter_start: int,
chapter_end: int,
) -> None:
book = epub.read_epub(input_file)
chapters = extract_chapters(book)
os.makedirs(output_folder, exist_ok=True)
# Get the book title and author from metadata or use fallback values
book_title = "Untitled"
author = "Unknown"
if book.get_metadata("DC", "title"):
book_title = book.get_metadata("DC", "title")[0][0]
if book.get_metadata("DC", "creator"):
author = book.get_metadata("DC", "creator")[0][0]
# Filter out empty or very short chapters
chapters = [(title, text) for title, text in chapters if text.strip()]
logger.info(f"Chapters count: {len(chapters)}.")
# Check chapter start and end args
if chapter_start < 1 or chapter_start > len(chapters):
raise ValueError(
f"Chapter start index {chapter_start} is out of range. Check your input."
)
if chapter_end < -1 or chapter_end > len(chapters):
raise ValueError(
f"Chapter end index {chapter_end} is out of range. Check your input."
)
if chapter_end == -1:
chapter_end = len(chapters)
if chapter_start > chapter_end:
raise ValueError(
f"Chapter start index {chapter_start} is larger than chapter end index {chapter_end}. Check your input."
)
logger.info(f"Converting chapters {chapter_start} to {chapter_end}.")
# Calculate the number of digits needed for zero padding the file name
max_digits = int(math.log10(len(chapters))) + 1
tts_queue = queue.Queue()
for idx, (title, text) in enumerate(chapters, start=1):
if idx < chapter_start:
continue
if idx > chapter_end:
break
padded_chap_number = str(idx).zfill(max_digits)
file_name = f"{padded_chap_number}_{title}"
tts_queue.put(
RecordingJob(
title=title,
text=text,
author=author,
output_file_prefix=os.path.join(output_folder, file_name),
book_title=book_title,
chapter_number=idx,
voice=voice,
speed=speed,
pause=pause,
)
)
pool = [
threading.Thread(
target=worker, args=(tts_queue,), daemon=True, name=f"worker-{i}"
)
for i in range(0, cpu_count())
]
for thread in pool:
thread.start()
tts_queue.join()
def worker(tts_queue: queue.Queue) -> None:
while True:
try:
job: RecordingJob = tts_queue.get(block=False)
except queue.Empty:
return
try:
convert_chapter(job)
except Exception as e:
logger.error(
f"Failed to convert chapter {job.chapter_number} to speech. Error: {e}"
)
else:
logger.info(
f"Converted chapter {job.chapter_number} to file {job.output_file_prefix}.mp3."
)
finally:
tts_queue.task_done()
def convert_chapter(job: RecordingJob) -> None:
logger.info(f"Converting chapter {job.chapter_number} to wav.")
subprocess.run(
[
"piper",
"--output_file",
job.wav_filename,
"--model",
job.voice,
"--length-scale",
job.speed,
"--sentence-silence",
job.pause,
],
input=job.text.encode("utf-8"),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
logger.info(f"Converting chapter {job.chapter_number} to mp3.")
subprocess.run(
[
"ffmpeg",
"-i",
job.wav_filename,
"-codec:a",
"libmp3lame",
"-b:a",
"64k",
job.mp3_filename,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
os.remove(job.wav_filename)
tag = EasyID3(job.mp3_filename)
tag["artist"] = job.author
tag["title"] = job.title
tag["album"] = job.book_title
tag["tracknumber"] = str(job.chapter_number)
tag.save(v2_version=3)
class Args(tap.TypedArgs):
epub: str = tap.arg(
positional=True,
help="Epub file",
)
audiobook_folder: str = tap.arg(
positional=True,
help="Destination folder for the mp3 files",
)
start: int = tap.arg(
default=1,
help="chapter to start from",
)
end: int = tap.arg(
default=-1,
help="chapter to finish at",
)
speed: str = tap.arg(
default="0.75",
help="speed of the generated audio",
)
voice: str = tap.arg(
default="en_US-joe-medium",
help="voice to use for the generated audio. To see valid options, see the docs for piper",
)
pause: str = tap.arg(
default="0.5",
help="length of pauses between sentences",
)
def main(args: Args):
epub_to_audiobook(
input_file=args.epub,
output_folder=args.audiobook_folder,
chapter_start=args.start,
chapter_end=args.end,
voice=args.voice,
speed=args.speed,
pause=args.pause,
)
logger.info("Done! 👍")
if __name__ == "__main__":
try:
tap.Parser(Args).bind(main).run()
except KeyboardInterrupt:
sys.exit(1)
@thillsd
Copy link
Author

thillsd commented Apr 2, 2024

If you check the version in the git repo, it defaults to only two threads. This version is much buggier.

I can see your issue in that repo. I'll look at it when I have time.

@bonelifer
Copy link

This version worked for me, without the problems the repo version had. Even with it maxing out my cores, the mp3's actually completed and worked. Thanks for replying. Look forward to new commits :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment