Skip to content

Instantly share code, notes, and snippets.

@ndevenish
Last active February 28, 2020 15:34
Show Gist options
  • Save ndevenish/0a23c17d8448a3d8605e0e6a7f223046 to your computer and use it in GitHub Desktop.
Save ndevenish/0a23c17d8448a3d8605e0e6a7f223046 to your computer and use it in GitHub Desktop.
Read a yum repo and fetch the version and URL of latest package
"""
Read a yum remote repo and get the latest version and url of a package
Prints output to stderr and to stdout:
<version> <url>
"""
import sys
import os
import argparse
import requests
from urllib.parse import urlparse
from pathlib import Path
import gzip
import hashlib
import logging
from xml.etree import ElementTree
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
parser = argparse.ArgumentParser(
description="Download the latest of some file from a repo"
)
parser.add_argument("repo", help="Repository URL to read from", metavar="REPO")
parser.add_argument("package", help="The package name to retrieve", metavar="PACKAGE")
parser.add_argument("--cache", help="The cache path to use", type=Path)
options = parser.parse_args()
# Work out the cache folder
if not options.cache:
if "XDG_CACHE_HOME" in os.environ:
options.cache = Path(os.environ["XDG_CACHE_HOME"])
else:
options.cache = Path("~/.cache")
options.cache = options.cache.expanduser()
options.cache = options.cache / "yumfetch" / ("." + urlparse(options.repo).path)
options.cache.mkdir(parents=True, exist_ok=True)
logger.info("Using cache folder %s", options.cache)
# strip trailing / from repo
options.repo = options.repo.rstrip("/")
def extract_primary_filelist(repodata):
# Extract the primary XML file list from this
root = ElementTree.fromstring(repodata)
primary_lists = root.findall(
".//{http://linux.duke.edu/metadata/repo}data[@type='primary']"
)
if len(primary_lists) > 1:
sys.exit("Error: More than one primary list, don't know how to handle")
elif not primary_lists:
sys.exit("Error: Could not find primary XML list file in repodata")
# Get the actual primary list
primary = primary_lists[0]
checksum = primary.find("{http://linux.duke.edu/metadata/repo}checksum")
if not checksum.attrib["type"] == "sha256":
sys.exit("Error: Unsupported checksum " + checksum.attrib["type"])
href = primary.find("{http://linux.duke.edu/metadata/repo}location").attrib["href"]
return checksum.text, href
def get_repo_file_cached(href, dest, checksum):
cache_path = dest
# primary_cache_path.parent + primary_cache_path.stem + ".checksum"
fetch = True
if cache_path.exists():
sha = hashlib.sha256()
sha.update(cache_path.read_bytes())
if sha.hexdigest() == checksum:
fetch = False
logger.info(f"Checksum match on {cache_path}")
else:
logger.info(f"Warning: Hash mismatch from cached data file {cache_path}")
if fetch:
cache_path.parent.mkdir(parents=True, exist_ok=True)
response = requests.get(options.repo + "/" + href)
if response.status_code != 200:
sys.exit("Error fetching file; Status code " + str(response.status_code))
cache_path.write_bytes(response.content)
return cache_path
# Get the repodata file
repodata_response = requests.get(options.repo + "/" + "repodata/repomd.xml")
if repodata_response.status_code != 200:
sys.exit(f"Error: repomd.xml returned status {repodata.status_code}")
checksum, href = extract_primary_filelist(repodata_response.text)
logger.info(f"Fetching primary file list {href}")
primary_file = get_repo_file_cached(href, options.cache / href, checksum)
# Now read the primary file
primary = ElementTree.fromstring(
gzip.decompress(primary_file.read_bytes()).decode("utf-8")
)
# Find all "provides" "name" entry points
all_packages = primary.findall(".//{http://linux.duke.edu/metadata/common}package")
def split_ver(version):
parts = []
for part in version.split("."):
try:
intpart = int(part)
parts.append(intpart)
except ValueError:
parts.append(part)
return parts
def yum_version_sort_key(package):
# <version epoch="0" ver="1.10.0" rel="1488387854.el7"/>
# <version epoch="0" ver="1.25.1" rel="1531323952.el7"/>
version = package.find("{http://linux.duke.edu/metadata/common}version")
vertup = tuple(
[version.attrib.get("epoch", 0)]
+ split_ver(version.attrib["ver"])
+ split_ver(version.attrib.get("rel", 0))
)
return vertup
candidates = sorted(
[
package
for package in all_packages
if package.find("{http://linux.duke.edu/metadata/common}name").text.strip()
== options.package and
package.find("{http://linux.duke.edu/metadata/common}arch").text.strip() == "x86_64"
],
key=yum_version_sort_key,
reverse=True
)
if not candidates:
sys.exit(f"Error: Could not find package {options.package} in repo")
logger.info(f"{len(candidates)} candidates found for package {options.package}.")
best_candidate = candidates[0]
version_node = best_candidate.find("{http://linux.duke.edu/metadata/common}version")
version = version_node.attrib['ver']
logger.info(f"Latest version: {version}")
repo_url_location = options.repo + "/" + best_candidate.find(".//{http://linux.duke.edu/metadata/common}location").attrib["href"]
# Print the URL and version as the last files here
print(version, repo_url_location)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment