Last active
July 8, 2022 02:08
-
-
Save KokoseiJ/480f153db16d215ccee15cb691873a12 to your computer and use it in GitHub Desktop.
Downloads the entire filetree from H5AI powered indexing website
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import time | |
import threading | |
import requests | |
from collections import deque | |
from bs4 import BeautifulSoup as bs | |
def join_path(base, path): | |
base = base[:-1] if base.endswith("/") else base | |
path = f"/{path}" if not path.startswith("/") else path | |
return base + path | |
def is_html(headers): | |
return headers['Content-Type'].split(";", 1)[0] == "text/html" | |
def download_file(iterable, filename, filesize, queue): | |
downloaded = 0 | |
with open(filename, "wb") as f: | |
for chunk in iterable: | |
f.write(chunk) | |
downloaded += len(chunk) | |
queue.append(downloaded) | |
if downloaded != filesize: | |
print(f"WARNING! Filesize mismatch for [{filename}] " | |
f"({downloaded}/{filesize})") | |
def readable_bytes(bytelen, bytesize=1024): | |
prefixes = ["B", "KB", "MB", "GB", "TB"] | |
level = 0 | |
while bytelen >= bytesize and level != len(prefixes) - 1: | |
level += 1 | |
bytelen /= bytesize | |
bytelen = round(bytelen, 2) | |
return f"{bytelen}{prefixes[level]}" | |
def mkchdir(dirname): | |
try: | |
os.mkdir(dirname) | |
except FileExistsError: | |
pass | |
os.chdir(dirname) | |
class H5AIClient: | |
def __init__(self, url, dummy=False): | |
self.dummy = dummy | |
self.baseurl = url | |
self.session = requests.session() | |
headers = { | |
"User-Agent": "H5AIDownloader" | |
} | |
self.session.headers.update(headers) | |
def check_file(self, path, name=None): | |
path = path[:-1] if path.endswith("/") else path | |
if name is None: | |
name = path.rsplit("/", 1)[-1] | |
req = self.get(path) | |
if is_html(req.headers): | |
mkchdir(name) | |
filelist = self.parse_filelist(req.content) | |
for name, path in filelist.items(): | |
self.check_file(path, name) | |
os.chdir("..") | |
else: | |
self.download(name, req) | |
def get(self, path): | |
fullpath = join_path(self.baseurl, path) | |
return self.session.get(fullpath, stream=True) | |
def parse_filelist(self, html): | |
soup = bs(html, features="lxml") | |
tds = soup.find_all("td", {"class": "fb-n"}) | |
paths = { | |
x.text: x.find("a")['href'] | |
for x in tds | |
if x.text != "Parent Directory" | |
} | |
return paths | |
def download(self, name, req): | |
print(f"Downloading {name}...\x1b[0K") | |
if self.dummy: | |
return | |
fillchr = "=" | |
emptychr = "-" | |
size = int(req.headers['Content-Length']) | |
iterable = req.iter_content(1024 * 4) | |
queue = deque(maxlen=1) | |
args = (iterable, name, size, queue) | |
thread = threading.Thread(target=download_file, args=args) | |
thread.start() | |
start_time = time.perf_counter() | |
while thread.is_alive(): | |
try: | |
downloaded = queue.pop() | |
except IndexError: | |
continue | |
elapsed_time = time.perf_counter() - start_time | |
progress_text = f"{downloaded}/{size}" | |
speed = f"({readable_bytes(downloaded / elapsed_time)}/s)" | |
etc_width = (len(progress_text) + len(speed) + 2) | |
bar_width = os.get_terminal_size()[0] - etc_width - 2 | |
fill = round(downloaded / size * bar_width) | |
empty = bar_width - fill | |
text = f"[{fillchr*fill}{emptychr*empty}] {progress_text} {speed}" | |
print(text, end="\r") | |
time.sleep(0.5) | |
print("\n") | |
return | |
def main(): | |
try: | |
url = sys.argv[1] | |
except IndexError: | |
url = input("URL to download: ") | |
find = re.fullmatch(r"(https?://[^/]+?)(/.*?)", url) | |
if find is None: | |
print("Improper URL") | |
exit(1) | |
groups = find.groups() | |
if groups[1] is None: | |
groups[1] = "/" | |
client = H5AIClient(groups[0]) | |
client.check_file(groups[1]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Made to mirror codebucket.de vanced mirror.
You can use the following command:
python ./h5ai_downloader.py https://mirror.codebucket.de/vanced/api/v1/apks/v17.03.38/nonroot/
to mirror it.