Created
March 3, 2024 02:35
-
-
Save twilligon/bebfb612b609692d95610967bd9fe29e to your computer and use it in GitHub Desktop.
Create an audiobook from a text file with OpenAI TTS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python3 | |
| import asyncio, os, string, sys | |
| from contextlib import suppress | |
| from aiolimiter import AsyncLimiter | |
| FORMAT = "flac" | |
| MAX_INPUT_LENGTH = 4096 | |
| COST_PER_CHAR = 0.030 / 1000 | |
| REQUESTS_PER_MIN = 10 | |
| RATELIMIT = AsyncLimiter(REQUESTS_PER_MIN) | |
| async def say(client, input, filename): | |
| async with RATELIMIT: | |
| response = await client.audio.speech.create( | |
| model="tts-1-hd", | |
| voice="echo", | |
| input=input, | |
| response_format=FORMAT, | |
| ) | |
| await response.astream_to_file(filename) | |
| return len(input) | |
| async def main(): | |
| try: | |
| txt, dir = sys.argv[1:] | |
| except (IndexError, ValueError): | |
| print("usage: read.py in.txt outdir", file=sys.stderr) | |
| return | |
| with suppress(FileExistsError): | |
| os.mkdir(dir) | |
| lines = [] | |
| pieces = [] | |
| with open(txt) as f: | |
| for idx, line in enumerate(f): | |
| # remove newline, trailing whitespace... | |
| line = line.strip() | |
| # if line doesn't end in punctuation, it is probably a heading, | |
| # and we should add some. for example, ["Intro", "Lorem ipsum…"] | |
| # should not be read "Intro Lorem ipsum…" but "Intro. Lorem ipsum…" | |
| # (or ideally "Intro: Lorem ipsum…" but that needs good heuristics) | |
| if not line or line[-1] not in string.punctuation: | |
| line += "." | |
| if len(line) > MAX_INPUT_LENGTH: | |
| print(f"Line {idx} is too long ({len(line)} chars, max {MAX_INPUT_LENGTH}).", file=sys.stderr) | |
| sys.exit(1) | |
| elif sum(map(len, pieces)) + len(pieces) + len(line) > MAX_INPUT_LENGTH: | |
| lines.append(" ".join(pieces)) | |
| del pieces[:] | |
| pieces.append(line) | |
| if pieces: | |
| lines.append(" ".join(pieces)) | |
| total_chars = sum(map(len, lines)) | |
| total_time = len(lines) / REQUESTS_PER_MIN | |
| total_cost = total_chars * COST_PER_CHAR | |
| confirmation = f"Read {len(lines)} lines, {total_chars} chars, in {total_time:.2f} min, for ${total_cost:.2f}? (Y/N): " | |
| while True: | |
| response = input(confirmation) | |
| if response in ("y", "Y"): | |
| break | |
| elif response in ("n", "N"): | |
| return | |
| def filename(idx): | |
| padded_idx = str(idx).zfill(len(str(len(lines)))) | |
| return f"{dir}/{padded_idx}.{FORMAT}" | |
| # defer slow import until we use it | |
| from openai import AsyncOpenAI | |
| client = AsyncOpenAI() | |
| responses = [say(client, line, filename(idx)) for idx, line in enumerate(lines)] | |
| def status(idx, chars): | |
| cost = chars * COST_PER_CHAR | |
| print(f"\rReading '{txt}' ({idx+1}/{len(lines)} lines, {chars}/{total_chars} chars, ${cost:.2f})", end="") | |
| chars_read = 0 | |
| status(0, chars_read) | |
| for idx, response in enumerate(asyncio.as_completed(responses)): | |
| chars_read += await response | |
| status(idx, chars_read) | |
| print() | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
License: CC0-1.0