Skip to content

Instantly share code, notes, and snippets.

@karolyi
Created May 3, 2020 10:15
Show Gist options
  • Save karolyi/4130e9b1671f9c2a04f5a8b39034fcb9 to your computer and use it in GitHub Desktop.
Save karolyi/4130e9b1671f9c2a04f5a8b39034fcb9 to your computer and use it in GitHub Desktop.
Revalidating nginx reverse-proxied FreeBSD pkg mirrors
#!/usr/bin/env python3.7
from argparse import ArgumentParser
from hashlib import sha256
from http.client import HTTPResponse
from io import BytesIO
from json import loads
from pathlib import Path
from subprocess import check_output
from sys import exit
from tarfile import open as tar_open
from typing import List, Set
from urllib.request import Request, urlopen
DEFAULT_PKGMIRROR_URL = 'http://pkgmirror.ksol.io'
parser = ArgumentParser()
parser.add_argument(
dest='jail_root', type=str, help='path of the jail (chroot)')
parser.add_argument(
dest='packages', type=str, help='space separated list of packages')
def _get_abi(jail_root: str) -> str:
'Return the used `$ABI` in the future jail.'
output = check_output(['pkg', '--chroot', jail_root, 'config', 'abi'])
return output.strip().decode('utf-8')
def _revalidate_packagesite(abi: str) -> List[bytes]:
"""
Revalidate packagesite before fetching and return the new
`ExFileObject` that is the `packagesite.txz`.
"""
url_prefix = '/'.join((DEFAULT_PKGMIRROR_URL, abi, 'latest'))
headers = {'Cache-Bypass': 'true'}
request = Request(url='/'.join((url_prefix, 'meta.txz')), headers=headers)
response = urlopen(url=request) # type: HTTPResponse
request = Request(
url='/'.join((url_prefix, 'packagesite.txz')), headers=headers)
response = urlopen(url=request) # type: HTTPResponse
archive = tar_open(mode='r:xz', fileobj=BytesIO(response.read()))
exfile = archive.extractfile('packagesite.yaml')
return exfile.read().splitlines()
def _load_packages(lines: List[bytes]) -> dict:
"""
Load and return the packages from the passed JSON structured lines.
"""
result = dict()
for line in lines:
loaded = loads(line)
name = loaded['name']
if name in result:
raise KeyError(f'{name} is defined twice in packages.')
result[name] = loaded
return result
def _extract_deps(
packages_list: dict, passed_packages: List[str]) -> Set[str]:
'Compile and return the packages to check, including dependencies.'
result = set(passed_packages)
for package in passed_packages:
if 'deps' not in packages_list[package]:
continue
result.update(_extract_deps(
packages_list=packages_list,
passed_packages=list(packages_list[package]['deps'])))
return result
def _get_packages_to_check(
abi: str, packages_list: dict, passed_packages: List[str]) -> dict:
'Compile and return the packages to check.'
to_check = _extract_deps(
packages_list=packages_list, passed_packages=passed_packages)
url_prefix = '/'.join((DEFAULT_PKGMIRROR_URL, abi, 'latest'))
result = dict()
for name in to_check:
info = packages_list[name]
result[name] = dict(
url='/'.join((url_prefix, info['repopath'])),
pkgsize=info['pkgsize'], sha256=info['sum'])
return result
def _fetch_and_get_info(request: Request) -> dict:
'Fetch the package and return size and SHA256 sum.'
response = urlopen(url=request) # type: HTTPResponse
content = response.read()
hasher = sha256()
hasher.update(content)
return dict(size=len(content), digest=hasher.hexdigest())
def _get_to_revalidate(packages_to_check: dict) -> dict:
"""
Download the packages in the dict return the mismatched ones in a
new `dict`.
"""
to_revalidate = dict()
for name, info in packages_to_check.items(): # type: str, dict
url = info['url']
request = Request(url=url)
dl_info = _fetch_and_get_info(request=request)
if info['pkgsize'] != dl_info['size']:
print(f'Size mismatch: {name}')
to_revalidate[name] = info
continue
if info['sha256'] != dl_info['digest']:
print(f'SHA256 sum mismatch: {name}')
to_revalidate[name] = info
continue
print(f'OK: {name}')
return to_revalidate
def _revalidate_packages(to_revalidate: dict) -> bool:
'Revalidate the packages that are mismatched.'
headers = {'Cache-Bypass': 'true'}
success = True
for name, info in to_revalidate.items(): # type: str, dict
print(f'Revalidating {name} ... ', end='')
url = info['url']
request = Request(url=url, headers=headers)
dl_info = _fetch_and_get_info(request=request)
if info['pkgsize'] != dl_info['size']:
print('Size mismatch!')
success = False
continue
if info['sha256'] != dl_info['digest']:
print('SHA256 sum mismatch!')
success = False
continue
print('OK.')
return success
def run():
args = parser.parse_args()
path_jailroot = Path(args.jail_root)
if not path_jailroot.is_dir():
raise FileNotFoundError(path_jailroot)
abi = _get_abi(jail_root=args.jail_root)
lines = _revalidate_packagesite(abi=abi)
packages_list = _load_packages(lines=lines)
passed_packages = args.packages.split()
packages_to_check = _get_packages_to_check(
abi=abi, packages_list=packages_list, passed_packages=passed_packages)
to_revalidate = _get_to_revalidate(packages_to_check=packages_to_check)
if to_revalidate:
if not _revalidate_packages(to_revalidate=to_revalidate):
exit(1)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment