Created
December 20, 2023 06:09
-
-
Save ewof/fd068d759095eb9ab04da18ccb2f0eff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import multiprocessing | |
import pathlib | |
import ebooklib | |
import typer | |
from ebooklib import epub | |
from markdownify import markdownify as md | |
from transformers import LlamaTokenizer | |
from nltk import tokenize | |
pretrained_model_path = '/home/models/NousResearch_Llama-2-13b-hf' | |
tokenizer = LlamaTokenizer.from_pretrained(pretrained_model_name_or_path=pretrained_model_path) | |
def queue_worker(q, iolock, min_pages, min_kudos, max_tokens, min_tokens): | |
while True: | |
try: | |
book, export_folder, error_folder = q.get() | |
if book is None: | |
break | |
book_processor(book, export_folder, error_folder, min_pages=min_pages, | |
min_kudos=min_kudos, max_tokens=max_tokens, min_tokens=min_tokens, print_lock=iolock) | |
except: | |
continue | |
def add_splits(token_count, max_tokens, text, list, min_tokens): | |
if token_count >= min_tokens and token_count <= max_tokens: | |
list.append({ | |
"text": text | |
}) | |
elif token_count > max_tokens: | |
sentences = tokenize.sent_tokenize(text) | |
if len(sentences) == 1: | |
string2 = text[:len(text)//2] | |
string1 = text[len(text)//2 if len(text) % 2 == 0 | |
else (((len(text)//2))+1):] | |
else: | |
half_length = len(sentences) // 2 | |
string2 = "".join(sentences[:half_length]) | |
string1 = "".join(sentences[half_length:]) | |
add_splits(len(tokenizer(string2)["input_ids"]), | |
max_tokens, string2, list, min_tokens) | |
add_splits(len(tokenizer(string1)["input_ids"]), | |
max_tokens, string1, list, min_tokens) | |
def book_processor( | |
epub_file: pathlib.Path, export_folder: pathlib.Path, error_folder: pathlib.Path, min_pages: int, min_kudos: int, max_tokens: int, min_tokens: int, print_lock=None | |
): | |
if not epub_file.suffix.lower().endswith("epub"): | |
return {} | |
try: | |
book = epub.read_epub(str(epub_file), options={"ignore_ncx": False}) | |
except KeyError: | |
print("ERROR", epub_file.name, "KeyError & reading_epub") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
except epub.EpubException as e: | |
if "bad zip file" in str(e).lower(): | |
print("ERROR", epub_file.name, "epub.EpubException, bad zip") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
print("ERROR", epub_file.name, "epub.EpubException") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
except Exception as e: | |
print("ERROR", epub_file.name, "? Error loading from epub") | |
epub_file.rename(error_folder / epub_file.name) | |
return | |
book_data = {"pages": []} | |
epub_pages = [md(document.content.decode(encoding="utf-8")).split("encoding=\"UTF-8\"?")[-1] | |
for document in book.get_items_of_type(ebooklib.ITEM_DOCUMENT)] | |
if len(epub_pages) < min_pages: | |
return | |
if int(epub_pages[0].partition("**Kudos:** ")[2].split("\n")[0]) < min_kudos: | |
return | |
epub_pages.pop(0) | |
book_data = {"pages": []} | |
for page in epub_pages: | |
token_count = len(tokenizer(page)["input_ids"]) | |
if token_count >= min_tokens and token_count <= max_tokens: | |
book_data["pages"].append({ | |
"text": page | |
}) | |
elif token_count > max_tokens: | |
add_splits(token_count, max_tokens, page, | |
book_data["pages"], min_tokens) | |
empty = 0 | |
for k in book_data["pages"]: | |
if not k: | |
empty += 1 | |
empty = empty/len(book_data["pages"]) | |
if print_lock: | |
with print_lock: | |
print("PAGES", epub_file.name, len(book_data["pages"]), "pages") | |
with open(export_folder / epub_file.with_suffix(epub_file.suffix + ".jsonl").name, "w") as f: | |
f.write('\n'.join([json.dumps(l) | |
for l in [page for page in book_data["pages"]]])) | |
if print_lock: | |
with print_lock: | |
print("FINISHED", epub_file.name) | |
app = typer.Typer() | |
@app.command() | |
def process_books(folder: pathlib.Path, export_folder: pathlib.Path, min_pages: int, min_kudos: int, max_tokens: int, min_tokens: int, error_folder: pathlib.Path = pathlib.Path("errors"), process: int = 64): | |
if not error_folder.is_dir(): | |
error_folder.mkdir(exist_ok=True) | |
queue = multiprocessing.Queue(maxsize=process) | |
iolock = multiprocessing.Lock() | |
pool = multiprocessing.Pool( | |
process, initializer=queue_worker, initargs=( | |
queue, iolock, min_pages, min_kudos, max_tokens, min_tokens) | |
) | |
# iolock = multiprocessing.Lock() | |
for i in folder.glob("*.epub"): | |
queue.put((i, export_folder, error_folder)) | |
for _ in range(process): | |
queue.put(None) | |
pool.close() | |
pool.join() | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment