Last active
February 28, 2020 15:34
-
-
Save ndevenish/0a23c17d8448a3d8605e0e6a7f223046 to your computer and use it in GitHub Desktop.
Read a yum repo and fetch the version and URL of latest package
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Read a yum remote repo and get the latest version and url of a package | |
Prints output to stderr and to stdout: | |
<version> <url> | |
""" | |
import sys | |
import os | |
import argparse | |
import requests | |
from urllib.parse import urlparse | |
from pathlib import Path | |
import gzip | |
import hashlib | |
import logging | |
from xml.etree import ElementTree | |
logger = logging.getLogger() | |
logging.basicConfig(level=logging.INFO, stream=sys.stderr) | |
parser = argparse.ArgumentParser( | |
description="Download the latest of some file from a repo" | |
) | |
parser.add_argument("repo", help="Repository URL to read from", metavar="REPO") | |
parser.add_argument("package", help="The package name to retrieve", metavar="PACKAGE") | |
parser.add_argument("--cache", help="The cache path to use", type=Path) | |
options = parser.parse_args() | |
# Work out the cache folder | |
if not options.cache: | |
if "XDG_CACHE_HOME" in os.environ: | |
options.cache = Path(os.environ["XDG_CACHE_HOME"]) | |
else: | |
options.cache = Path("~/.cache") | |
options.cache = options.cache.expanduser() | |
options.cache = options.cache / "yumfetch" / ("." + urlparse(options.repo).path) | |
options.cache.mkdir(parents=True, exist_ok=True) | |
logger.info("Using cache folder %s", options.cache) | |
# strip trailing / from repo | |
options.repo = options.repo.rstrip("/") | |
def extract_primary_filelist(repodata): | |
# Extract the primary XML file list from this | |
root = ElementTree.fromstring(repodata) | |
primary_lists = root.findall( | |
".//{http://linux.duke.edu/metadata/repo}data[@type='primary']" | |
) | |
if len(primary_lists) > 1: | |
sys.exit("Error: More than one primary list, don't know how to handle") | |
elif not primary_lists: | |
sys.exit("Error: Could not find primary XML list file in repodata") | |
# Get the actual primary list | |
primary = primary_lists[0] | |
checksum = primary.find("{http://linux.duke.edu/metadata/repo}checksum") | |
if not checksum.attrib["type"] == "sha256": | |
sys.exit("Error: Unsupported checksum " + checksum.attrib["type"]) | |
href = primary.find("{http://linux.duke.edu/metadata/repo}location").attrib["href"] | |
return checksum.text, href | |
def get_repo_file_cached(href, dest, checksum): | |
cache_path = dest | |
# primary_cache_path.parent + primary_cache_path.stem + ".checksum" | |
fetch = True | |
if cache_path.exists(): | |
sha = hashlib.sha256() | |
sha.update(cache_path.read_bytes()) | |
if sha.hexdigest() == checksum: | |
fetch = False | |
logger.info(f"Checksum match on {cache_path}") | |
else: | |
logger.info(f"Warning: Hash mismatch from cached data file {cache_path}") | |
if fetch: | |
cache_path.parent.mkdir(parents=True, exist_ok=True) | |
response = requests.get(options.repo + "/" + href) | |
if response.status_code != 200: | |
sys.exit("Error fetching file; Status code " + str(response.status_code)) | |
cache_path.write_bytes(response.content) | |
return cache_path | |
# Get the repodata file | |
repodata_response = requests.get(options.repo + "/" + "repodata/repomd.xml") | |
if repodata_response.status_code != 200: | |
sys.exit(f"Error: repomd.xml returned status {repodata.status_code}") | |
checksum, href = extract_primary_filelist(repodata_response.text) | |
logger.info(f"Fetching primary file list {href}") | |
primary_file = get_repo_file_cached(href, options.cache / href, checksum) | |
# Now read the primary file | |
primary = ElementTree.fromstring( | |
gzip.decompress(primary_file.read_bytes()).decode("utf-8") | |
) | |
# Find all "provides" "name" entry points | |
all_packages = primary.findall(".//{http://linux.duke.edu/metadata/common}package") | |
def split_ver(version): | |
parts = [] | |
for part in version.split("."): | |
try: | |
intpart = int(part) | |
parts.append(intpart) | |
except ValueError: | |
parts.append(part) | |
return parts | |
def yum_version_sort_key(package): | |
# <version epoch="0" ver="1.10.0" rel="1488387854.el7"/> | |
# <version epoch="0" ver="1.25.1" rel="1531323952.el7"/> | |
version = package.find("{http://linux.duke.edu/metadata/common}version") | |
vertup = tuple( | |
[version.attrib.get("epoch", 0)] | |
+ split_ver(version.attrib["ver"]) | |
+ split_ver(version.attrib.get("rel", 0)) | |
) | |
return vertup | |
candidates = sorted( | |
[ | |
package | |
for package in all_packages | |
if package.find("{http://linux.duke.edu/metadata/common}name").text.strip() | |
== options.package and | |
package.find("{http://linux.duke.edu/metadata/common}arch").text.strip() == "x86_64" | |
], | |
key=yum_version_sort_key, | |
reverse=True | |
) | |
if not candidates: | |
sys.exit(f"Error: Could not find package {options.package} in repo") | |
logger.info(f"{len(candidates)} candidates found for package {options.package}.") | |
best_candidate = candidates[0] | |
version_node = best_candidate.find("{http://linux.duke.edu/metadata/common}version") | |
version = version_node.attrib['ver'] | |
logger.info(f"Latest version: {version}") | |
repo_url_location = options.repo + "/" + best_candidate.find(".//{http://linux.duke.edu/metadata/common}location").attrib["href"] | |
# Print the URL and version as the last files here | |
print(version, repo_url_location) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment