Created
May 3, 2020 10:15
-
-
Save karolyi/4130e9b1671f9c2a04f5a8b39034fcb9 to your computer and use it in GitHub Desktop.
Revalidating nginx reverse-proxied FreeBSD pkg mirrors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
from argparse import ArgumentParser | |
from hashlib import sha256 | |
from http.client import HTTPResponse | |
from io import BytesIO | |
from json import loads | |
from pathlib import Path | |
from subprocess import check_output | |
from sys import exit | |
from tarfile import open as tar_open | |
from typing import List, Set | |
from urllib.request import Request, urlopen | |
DEFAULT_PKGMIRROR_URL = 'http://pkgmirror.ksol.io' | |
parser = ArgumentParser() | |
parser.add_argument( | |
dest='jail_root', type=str, help='path of the jail (chroot)') | |
parser.add_argument( | |
dest='packages', type=str, help='space separated list of packages') | |
def _get_abi(jail_root: str) -> str: | |
'Return the used `$ABI` in the future jail.' | |
output = check_output(['pkg', '--chroot', jail_root, 'config', 'abi']) | |
return output.strip().decode('utf-8') | |
def _revalidate_packagesite(abi: str) -> List[bytes]: | |
""" | |
Revalidate packagesite before fetching and return the new | |
`ExFileObject` that is the `packagesite.txz`. | |
""" | |
url_prefix = '/'.join((DEFAULT_PKGMIRROR_URL, abi, 'latest')) | |
headers = {'Cache-Bypass': 'true'} | |
request = Request(url='/'.join((url_prefix, 'meta.txz')), headers=headers) | |
response = urlopen(url=request) # type: HTTPResponse | |
request = Request( | |
url='/'.join((url_prefix, 'packagesite.txz')), headers=headers) | |
response = urlopen(url=request) # type: HTTPResponse | |
archive = tar_open(mode='r:xz', fileobj=BytesIO(response.read())) | |
exfile = archive.extractfile('packagesite.yaml') | |
return exfile.read().splitlines() | |
def _load_packages(lines: List[bytes]) -> dict: | |
""" | |
Load and return the packages from the passed JSON structured lines. | |
""" | |
result = dict() | |
for line in lines: | |
loaded = loads(line) | |
name = loaded['name'] | |
if name in result: | |
raise KeyError(f'{name} is defined twice in packages.') | |
result[name] = loaded | |
return result | |
def _extract_deps( | |
packages_list: dict, passed_packages: List[str]) -> Set[str]: | |
'Compile and return the packages to check, including dependencies.' | |
result = set(passed_packages) | |
for package in passed_packages: | |
if 'deps' not in packages_list[package]: | |
continue | |
result.update(_extract_deps( | |
packages_list=packages_list, | |
passed_packages=list(packages_list[package]['deps']))) | |
return result | |
def _get_packages_to_check( | |
abi: str, packages_list: dict, passed_packages: List[str]) -> dict: | |
'Compile and return the packages to check.' | |
to_check = _extract_deps( | |
packages_list=packages_list, passed_packages=passed_packages) | |
url_prefix = '/'.join((DEFAULT_PKGMIRROR_URL, abi, 'latest')) | |
result = dict() | |
for name in to_check: | |
info = packages_list[name] | |
result[name] = dict( | |
url='/'.join((url_prefix, info['repopath'])), | |
pkgsize=info['pkgsize'], sha256=info['sum']) | |
return result | |
def _fetch_and_get_info(request: Request) -> dict: | |
'Fetch the package and return size and SHA256 sum.' | |
response = urlopen(url=request) # type: HTTPResponse | |
content = response.read() | |
hasher = sha256() | |
hasher.update(content) | |
return dict(size=len(content), digest=hasher.hexdigest()) | |
def _get_to_revalidate(packages_to_check: dict) -> dict: | |
""" | |
Download the packages in the dict return the mismatched ones in a | |
new `dict`. | |
""" | |
to_revalidate = dict() | |
for name, info in packages_to_check.items(): # type: str, dict | |
url = info['url'] | |
request = Request(url=url) | |
dl_info = _fetch_and_get_info(request=request) | |
if info['pkgsize'] != dl_info['size']: | |
print(f'Size mismatch: {name}') | |
to_revalidate[name] = info | |
continue | |
if info['sha256'] != dl_info['digest']: | |
print(f'SHA256 sum mismatch: {name}') | |
to_revalidate[name] = info | |
continue | |
print(f'OK: {name}') | |
return to_revalidate | |
def _revalidate_packages(to_revalidate: dict) -> bool: | |
'Revalidate the packages that are mismatched.' | |
headers = {'Cache-Bypass': 'true'} | |
success = True | |
for name, info in to_revalidate.items(): # type: str, dict | |
print(f'Revalidating {name} ... ', end='') | |
url = info['url'] | |
request = Request(url=url, headers=headers) | |
dl_info = _fetch_and_get_info(request=request) | |
if info['pkgsize'] != dl_info['size']: | |
print('Size mismatch!') | |
success = False | |
continue | |
if info['sha256'] != dl_info['digest']: | |
print('SHA256 sum mismatch!') | |
success = False | |
continue | |
print('OK.') | |
return success | |
def run(): | |
args = parser.parse_args() | |
path_jailroot = Path(args.jail_root) | |
if not path_jailroot.is_dir(): | |
raise FileNotFoundError(path_jailroot) | |
abi = _get_abi(jail_root=args.jail_root) | |
lines = _revalidate_packagesite(abi=abi) | |
packages_list = _load_packages(lines=lines) | |
passed_packages = args.packages.split() | |
packages_to_check = _get_packages_to_check( | |
abi=abi, packages_list=packages_list, passed_packages=passed_packages) | |
to_revalidate = _get_to_revalidate(packages_to_check=packages_to_check) | |
if to_revalidate: | |
if not _revalidate_packages(to_revalidate=to_revalidate): | |
exit(1) | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment