Skip to content

Instantly share code, notes, and snippets.

@ewof
Created December 20, 2023 06:09
Show Gist options
  • Save ewof/fd068d759095eb9ab04da18ccb2f0eff to your computer and use it in GitHub Desktop.
Save ewof/fd068d759095eb9ab04da18ccb2f0eff to your computer and use it in GitHub Desktop.
import json
import multiprocessing
import pathlib
import ebooklib
import typer
from ebooklib import epub
from markdownify import markdownify as md
from transformers import LlamaTokenizer
from nltk import tokenize
pretrained_model_path = '/home/models/NousResearch_Llama-2-13b-hf'
tokenizer = LlamaTokenizer.from_pretrained(pretrained_model_name_or_path=pretrained_model_path)
def queue_worker(q, iolock, min_pages, min_kudos, max_tokens, min_tokens):
while True:
try:
book, export_folder, error_folder = q.get()
if book is None:
break
book_processor(book, export_folder, error_folder, min_pages=min_pages,
min_kudos=min_kudos, max_tokens=max_tokens, min_tokens=min_tokens, print_lock=iolock)
except:
continue
def add_splits(token_count, max_tokens, text, list, min_tokens):
if token_count >= min_tokens and token_count <= max_tokens:
list.append({
"text": text
})
elif token_count > max_tokens:
sentences = tokenize.sent_tokenize(text)
if len(sentences) == 1:
string2 = text[:len(text)//2]
string1 = text[len(text)//2 if len(text) % 2 == 0
else (((len(text)//2))+1):]
else:
half_length = len(sentences) // 2
string2 = "".join(sentences[:half_length])
string1 = "".join(sentences[half_length:])
add_splits(len(tokenizer(string2)["input_ids"]),
max_tokens, string2, list, min_tokens)
add_splits(len(tokenizer(string1)["input_ids"]),
max_tokens, string1, list, min_tokens)
def book_processor(
epub_file: pathlib.Path, export_folder: pathlib.Path, error_folder: pathlib.Path, min_pages: int, min_kudos: int, max_tokens: int, min_tokens: int, print_lock=None
):
if not epub_file.suffix.lower().endswith("epub"):
return {}
try:
book = epub.read_epub(str(epub_file), options={"ignore_ncx": False})
except KeyError:
print("ERROR", epub_file.name, "KeyError & reading_epub")
epub_file.rename(error_folder / epub_file.name)
return
except epub.EpubException as e:
if "bad zip file" in str(e).lower():
print("ERROR", epub_file.name, "epub.EpubException, bad zip")
epub_file.rename(error_folder / epub_file.name)
return
print("ERROR", epub_file.name, "epub.EpubException")
epub_file.rename(error_folder / epub_file.name)
return
except Exception as e:
print("ERROR", epub_file.name, "? Error loading from epub")
epub_file.rename(error_folder / epub_file.name)
return
book_data = {"pages": []}
epub_pages = [md(document.content.decode(encoding="utf-8")).split("encoding=\"UTF-8\"?")[-1]
for document in book.get_items_of_type(ebooklib.ITEM_DOCUMENT)]
if len(epub_pages) < min_pages:
return
if int(epub_pages[0].partition("**Kudos:** ")[2].split("\n")[0]) < min_kudos:
return
epub_pages.pop(0)
book_data = {"pages": []}
for page in epub_pages:
token_count = len(tokenizer(page)["input_ids"])
if token_count >= min_tokens and token_count <= max_tokens:
book_data["pages"].append({
"text": page
})
elif token_count > max_tokens:
add_splits(token_count, max_tokens, page,
book_data["pages"], min_tokens)
empty = 0
for k in book_data["pages"]:
if not k:
empty += 1
empty = empty/len(book_data["pages"])
if print_lock:
with print_lock:
print("PAGES", epub_file.name, len(book_data["pages"]), "pages")
with open(export_folder / epub_file.with_suffix(epub_file.suffix + ".jsonl").name, "w") as f:
f.write('\n'.join([json.dumps(l)
for l in [page for page in book_data["pages"]]]))
if print_lock:
with print_lock:
print("FINISHED", epub_file.name)
app = typer.Typer()
@app.command()
def process_books(folder: pathlib.Path, export_folder: pathlib.Path, min_pages: int, min_kudos: int, max_tokens: int, min_tokens: int, error_folder: pathlib.Path = pathlib.Path("errors"), process: int = 64):
if not error_folder.is_dir():
error_folder.mkdir(exist_ok=True)
queue = multiprocessing.Queue(maxsize=process)
iolock = multiprocessing.Lock()
pool = multiprocessing.Pool(
process, initializer=queue_worker, initargs=(
queue, iolock, min_pages, min_kudos, max_tokens, min_tokens)
)
# iolock = multiprocessing.Lock()
for i in folder.glob("*.epub"):
queue.put((i, export_folder, error_folder))
for _ in range(process):
queue.put(None)
pool.close()
pool.join()
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment