Last active
January 14, 2024 18:11
-
-
Save siennathesane/78363da5fe1e68a4e89d9fa8c720e034 to your computer and use it in GitHub Desktop.
Download all ePub books from Project Gutenberg. Only dependency is `alive-progress`, can be packaged with PyInstaller.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from argparse import ArgumentParser | |
from os import listdir, stat | |
from os.path import sep as pathsep | |
from glob import glob | |
from shutil import SameFileError, copy, get_terminal_size | |
from typing import Union | |
from sys import exit, stdout | |
from alive_progress import alive_bar | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
class Gutenberg: | |
def __init__(self, source_folder: str = "/Volumes/gutenberg.pglaf.org/mirrors/gutenberg/cache/epub", | |
download_folder: str = "/Users/mxplusb/Downloads/gutenberg", | |
epub_glob: str = "*.epub") -> None: | |
if not self._validate_path(source_folder): | |
print("{0} is not a valid download folder location".format(source_folder)) | |
exit(1) | |
if not self._validate_path(download_folder): | |
print("{0} is not a valid source folder location".format(source_folder)) | |
exit(1) | |
self.src_folder = source_folder | |
self.dst_folder = download_folder | |
self.epub_glob = epub_glob | |
self.root_files: list[str] = [] | |
self.terminal_dim = get_terminal_size((10,10)) | |
self._fetch_dirs() | |
def _fetch_dirs(self) -> None: | |
self.root_files = listdir(self.src_folder) | |
def _validate_path(self, path: str) -> bool: | |
try: | |
stat(path) | |
return True | |
except FileNotFoundError: | |
return False | |
def _progress_bar(self, it, prefix="", size=0, file=stdout): | |
if size == 0: | |
size = self.terminal_dim.columns // 3 | |
count = len(it) | |
def show(j): | |
x = int(size*j/count) | |
file.write("%s[%s%s] %i/%i\r" % (prefix, "#"*x, "."*(size-x), j, count)) | |
file.flush() | |
show(0) | |
for i, item in enumerate(it): | |
yield item | |
show(i+1) | |
file.write("\n") | |
file.flush() | |
@property | |
def count(self): | |
count = 0 | |
for x in self.root_files: | |
try: | |
int(x.split(pathsep)[-1]) | |
count += 1 | |
except Exception: | |
continue | |
return count | |
def list_dirs(self, start: int, stop: int, bar: alive_bar = None) -> list[str]: | |
''' | |
Inclusive folder range list, max 250 at a time. | |
''' | |
if (stop - start) > 250: | |
print("cannot safely operate on more than 250 directories at a time") | |
exit(1) | |
f: list[int] = [] | |
dirstr = "finding directories {0}-{1}".format(start, stop) | |
for file in self.root_files: | |
fn = None | |
try: | |
fn = int(file) | |
except Exception: | |
continue | |
if fn > start and fn < stop or fn == start or fn == stop: | |
f.append(fn) | |
if bar is not None: bar() | |
f.sort() | |
f = [str(i) for i in f] | |
return f | |
def list_epub(self, dir: Union[list[str], str], with_images: bool = False, bar: alive_bar = None) -> list[str]: | |
f: list[str] = [] | |
if isinstance(dir, list): | |
for fn in dir: | |
fstr = "{0}/{1}/{2}".format(self.src_folder, fn, self.epub_glob) | |
f += glob(fstr) | |
if bar is not None: bar() | |
else: | |
fstr = "{0}/{1}/{2}".format(self.src_folder, dir, self.epub_glob) | |
print(fstr) | |
f += glob(fstr) | |
if with_images: | |
f = [x for x in f if 'images' in x] | |
else: | |
f = [x for x in f if 'images' not in x] | |
return f | |
def download_epub(self, src: Union[list[str], str], dst_folder: str = "", continue_on_error: bool = True, bar: alive_bar = None) -> None: | |
if dst_folder == "": | |
dst_folder = self.dst_folder | |
if not self._validate_path(dst_folder): | |
print("{0} is not a valid path".format(dst_folder)) | |
exit(1) | |
if isinstance(src, list): | |
pstr = "copying {0} files".format(len(src)) | |
print(pstr) | |
futures = [] | |
with ProcessPoolExecutor(max_workers=4) as pool: | |
for fn in src: | |
f = fn.split(pathsep)[-1] | |
futures.append(pool.submit(copy, fn, dst_folder)) | |
for _ in as_completed(futures): | |
if bar is not None: bar() | |
else: | |
f = src.split(pathsep)[-1] | |
try: | |
copy(src, dst_folder) | |
except SameFileError: | |
fstr = "file already exists at destination: {0}".format(f) | |
print(fstr) | |
except Exception as e: | |
fstr = "cannot copy file: {0}: {1}".format(f, e) | |
print(fstr) | |
def all(self): | |
book_ids = self.count | |
chunk_size = 150 | |
with alive_bar(book_ids*3, title="operation progress:") as master: | |
for chunk in range(1, book_ids, chunk_size): | |
start = chunk | |
stop = chunk + chunk_size | |
if stop > book_ids: | |
stop = book_ids | |
dirs = self.list_dirs(start, stop, bar=master) | |
epubs = self.list_epub(dirs, bar=master) | |
self.download_epub(epubs, bar=master) | |
def main() -> None: | |
parser = ArgumentParser(description="""a tool to download files from project gutenberg. | |
you need to have ftp://gutenberg.pglaf.org mounted somewhere in your filesystem for this script to work. | |
you also need to pass either one or both arguments, passing a single argument won't work""") | |
parser.add_argument("--source-dir", "-s", | |
action="store", | |
help="where the cache epub directory is located", | |
default="/Volumes/gutenberg.pglaf.org/mirrors/gutenberg/cache/epub") | |
parser.add_argument("--download-dir", "-d", | |
action="store", | |
help="where to download the books to", | |
default="/Users/mxplusb/Downloads/gutenberg") | |
args = parser.parse_args() | |
g = None | |
if args.download_dir and args.source_dir: | |
g = Gutenberg(source_folder=args.source_dir, download_folder=args.download_dir) | |
else: | |
g = Gutenberg() | |
g.all() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment