Created
December 20, 2023 06:11
-
-
Save ewof/63cbf27aa7c38e0d9159a65cd7a310a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import multiprocessing | |
import pathlib | |
import ebooklib | |
import typer | |
from ebooklib import epub | |
from markdownify import markdownify as md | |
from transformers import LlamaTokenizer | |
from nltk import tokenize | |
pretrained_model_path = '/home/models/NousResearch_Llama-2-13b-hf' | |
tokenizer = LlamaTokenizer.from_pretrained(pretrained_model_name_or_path=pretrained_model_path) | |
def queue_worker(q, iolock, max_tokens, min_tokens): | |
while True: | |
try: | |
book, export_folder, error_folder = q.get() | |
if book is None: | |
break | |
book_processor(book, export_folder, error_folder, max_tokens=max_tokens, min_tokens=min_tokens, print_lock=iolock) | |
except: | |
continue | |
def add_splits(token_count, max_tokens, text, list, min_tokens): | |
if token_count >= min_tokens and token_count <= max_tokens: | |
list.append({ | |
"text": text | |
}) | |
elif token_count > max_tokens: | |
sentences = tokenize.sent_tokenize(text) | |
if len(sentences) == 1: | |
string2 = text[:len(text)//2] | |
string1 = text[len(text)//2 if len(text) % 2 == 0 | |
else (((len(text)//2))+1):] | |
else: | |
half_length = len(sentences) // 2 | |
string2 = "".join(sentences[:half_length]) | |
string1 = "".join(sentences[half_length:]) | |
add_splits(len(tokenizer(string2)["input_ids"]), | |
max_tokens, string2, list, min_tokens) | |
add_splits(len(tokenizer(string1)["input_ids"]), | |
max_tokens, string1, list, min_tokens) | |
def book_processor( | |
epub_file: pathlib.Path, export_folder: pathlib.Path, error_folder: pathlib.Path, max_tokens: int, min_tokens: int, print_lock=None | |
): | |
if not epub_file.suffix.lower().endswith("epub"): | |
return | |
try: | |
book = epub.read_epub(str(epub_file), options={"ignore_ncx": False}) | |
except KeyError: | |
print("ERROR", epub_file.name, "KeyError & reading_epub") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
except epub.EpubException as e: | |
if "bad zip file" in str(e).lower(): | |
print("ERROR", epub_file.name, "epub.EpubException, bad zip") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
print("ERROR", epub_file.name, "epub.EpubException") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
except Exception as e: | |
print("ERROR", epub_file.name, "? Error loading from epub") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
book_data = {"pages": []} | |
epub_pages = [] | |
for document in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): | |
page = md(document.content.decode(encoding="utf-8")) | |
epub_pages.append(page) | |
epub_pages.pop(0) | |
book_data = {"pages": []} | |
for page in epub_pages: | |
token_count = len(tokenizer(page)["input_ids"]) | |
if token_count >= min_tokens and token_count <= max_tokens: | |
book_data["pages"].append({ | |
"text": page | |
}) | |
elif token_count > max_tokens: | |
add_splits(token_count, max_tokens, page, | |
book_data["pages"], min_tokens) | |
empty = 0 | |
for k in book_data["pages"]: | |
if not k: | |
empty += 1 | |
empty = empty/len(book_data["pages"]) | |
if print_lock: | |
with print_lock: | |
print("PAGES", epub_file.name, len(book_data["pages"]), "pages") | |
with open(export_folder / epub_file.with_suffix(epub_file.suffix + ".jsonl").name, "w") as f: | |
f.write('\n'.join([json.dumps(l) for l in [page for page in book_data["pages"]]])) | |
if print_lock: | |
with print_lock: | |
print("FINISHED", epub_file.name) | |
app = typer.Typer() | |
@app.command() | |
def process_books(folder: pathlib.Path, export_folder: pathlib.Path, max_tokens: int, min_tokens: int, error_folder: pathlib.Path = pathlib.Path("errors"), process: int = 64): | |
if not error_folder.is_dir(): | |
error_folder.mkdir(exist_ok=True) | |
queue = multiprocessing.Queue(maxsize=process) | |
iolock = multiprocessing.Lock() | |
pool = multiprocessing.Pool( | |
process, initializer=queue_worker, initargs=( | |
queue, iolock, max_tokens, min_tokens) | |
) | |
# iolock = multiprocessing.Lock() | |
for i in folder.glob("*.epub"): | |
queue.put((i, export_folder, error_folder)) | |
for _ in range(process): | |
queue.put(None) | |
pool.close() | |
pool.join() | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment