Skip to content

Instantly share code, notes, and snippets.

@ssato
Created July 30, 2014 14:14
Show Gist options
  • Save ssato/3ef5681a8e72a2529e05 to your computer and use it in GitHub Desktop.
Save ssato/3ef5681a8e72a2529e05 to your computer and use it in GitHub Desktop.
#
# repodata.py - Parse repodata/*.xml.gz in yum repositories.
#
# Copyright (C) 2012 Satoru SATOH <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from rpmkit.memoize import memoize
from rpmkit.utils import concat, uniq, uconcat
from itertools import repeat, izip
from logging import DEBUG, INFO
from operator import itemgetter
import xml.etree.cElementTree as ET
import gzip
import logging
import optparse
import os.path
import os
import re
import sys
import textwrap
try:
from hashlib import md5 # python 2.5+
except ImportError:
from md5 import md5
try:
import json
except ImportError:
import simplejson as json
REPODATA_SYS_TOPDIR = "/var/lib/rpmkit/repodata"
REPODATA_USER_TOPDIR = os.path.expanduser("~/.cache/rpmkit/repodata")
REPODATA_TOPDIRS = [REPODATA_SYS_TOPDIR, REPODATA_USER_TOPDIR]
REPODATA_NAMES = ("groups", "filelists", "requires", "provides", "packages")
REPODATA_XMLS = (REPODATA_COMPS, REPODATA_FILELISTS,
REPODATA_PRIMARY) = ("comps", "filelists", "primary")
_SPECIAL_RE = re.compile(r"^(?:config|rpmlib|kernel|rtld)([^)]+)")
def _true(*args):
return True
def _tree_from_xml(xmlfile):
return ET.parse(gzip.open(xmlfile)
if xmlfile.endswith(".gz") else open(xmlfile))
def select_topdir():
return REPODATA_SYS_TOPDIR if os.geteuid() == 0 else REPODATA_USER_TOPDIR
def _find_xml_files_g(topdir="/var/cache/yum", rtype=REPODATA_COMPS):
"""
Find {comps,filelists,primary}.xml under `topdir` and yield its path.
"""
for root, dirs, files in os.walk(topdir):
for f in files:
if rtype in f and (f.endswith(".xml.gz") or f.endswith(".xml")):
yield os.path.join(root, f)
def _find_xml_file(topdir, rtype=REPODATA_COMPS):
fs = [f for f in _find_xml_files_g(topdir, rtype)]
assert fs, "No %s.xml[.gz] found under %s" % (rtype, topdir)
return fs[0]
find_xml_file = memoize(_find_xml_file)
def _is_special(x):
"""
'special' means that it's not file nor virtual package nor package name
such like 'config(setup)', 'rpmlib(PayloadFilesHavePrefix)',
'kernel(rhel5_net_netlink_ga)'.
:param x: filename or file path or provides or something like rpmlib(...)
>>> _is_special('config(samba)')
True
>>> _is_special('rpmlib(CompressedFileNames)')
True
>>> _is_special('rtld(GNU_HASH)')
True
"""
return _SPECIAL_RE.match(x) is not None
def _strip_x(x):
"""
>>> _strip_x('libgstbase-0.10.so.0()(64bit)')
'libgstbase-0.10.so.0'
>>> _strip_x('libc.so.6(GLIBC_2.2.5)(64bit)')
'libc.so.6'
"""
return x[:x.find('(')] if '(' in x and x.endswith(')') else x
def get_package_groups(xmlfile, byid=True):
"""
Parse given comps file (`xmlfile`) and return a list of package group and
packages pairs.
:param xmlfile: comps xml file path
:param byid: Identify package groups by ID or name
:return: [(group_id_or_name, [package_names])]
"""
gk = "./group"
kk = "./id" if byid else "./name"
pk = "./packagelist/packagereq/[@type='default']"
gps = ((g.find(kk).text, [p.text for p in g.findall(pk)]) for g
in _tree_from_xml(xmlfile).findall(gk))
# filter out groups having no packages as such group is useless:
return [(g, ps) for g, ps in gps if ps]
def get_package_requires_and_provides(xmlfile, include_specials=False):
"""
Parse given primary.xml `xmlfile` and return a list of package, requires
and provides tuples, [(package, [requires], [provides])].
:param xmlfile: primary.xml file path
:param include_specials: Do not ignore and include special objects,
neighther file nor package
:return: [(package, [requires], [provides])]
"""
# We need to take care of namespaces in elementtree library.
# SEE ALSO: http://effbot.org/zone/element-namespaces.htm
ns0 = "http://linux.duke.edu/metadata/common"
ns1 = "http://linux.duke.edu/metadata/rpm"
pkk = "./{%s}package" % ns0 # [package]
pnk = "./{%s}name" % ns0 # package name
rqk = ".//{%s}requires/{%s}entry" % (ns1, ns1) # [requires]
prk = ".//{%s}provides/{%s}entry" % (ns1, ns1) # [provides]
pred = lambda y: include_specials or not _is_special(y.get("name"))
name = lambda z: _strip_x(z.get("name"))
return [(p.find(pnk).text,
uniq([name(x) for x in p.findall(rqk) if pred(x)]),
uniq([name(y) for y in p.findall(prk) if pred(y)])) for p
in _tree_from_xml(xmlfile).findall(pkk)]
def get_package_files(xmlfile, packages=[]):
"""
Parse given filelist.xml `xmlfile` and return a list of package and files
pairs, [(package, [files])].
:param xmlfile: filelist.xml file path
:return: [(package, file)]
"""
ns = "http://linux.duke.edu/metadata/filelists"
pk = "./{%s}package" % ns # package
fk = "./{%s}file" % ns # file
pred = (lambda p: p in packages) if packages else (lambda _: True)
pfs = ((p.get("name"), uniq(x.text for x in p.findall(fk))) for p
in _tree_from_xml(xmlfile).findall(pk))
return concat((izip(repeat(p), fs) for p, fs in pfs if fs and pred(p)))
def _find_package_from_filelists(x, filelists, packages, exact=True):
"""
:param exact: Try exact match if True
"""
pred = (lambda x, f: x == f) if exact else (lambda x, f: f.endswith(x))
return uniq((p for p, f in filelists if pred(x, f) and p in packages))
def _find_package_from_provides(x, provides, packages):
return uniq((p for p, pr in provides if x == pr and p in packages))
# find_package_from_filelists = memoize(_find_package_from_filelists)
# find_package_from_provides = memoize(_find_package_from_provides)
find_package_from_filelists = _find_package_from_filelists
find_package_from_provides = _find_package_from_provides
def _find_providing_packages(x, provides, filelists, packages):
"""
:param x: filename or file path or provides or something like rpmlib(...)
"""
if x.startswith("/"): # file path
ps = find_package_from_filelists(x, filelists, packages)
if ps:
logging.debug("Packages provide %s (filelists): %s" % (x, ps))
return ps
# There are cases x not in filelists; '/usr/sbin/sendmail',
# /usr/bin/lp for example. These should be found in provides.
else:
ps = find_package_from_provides(x, provides, packages)
if ps:
logging.debug("Packages provide %s (provides): %s" % (x, ps))
return ps
else:
logging.debug("No package provides " + x)
return [x]
else:
# 1. Try exact match in packages:
ps = [p for p in packages if x == p]
if ps:
logging.debug("It's package (packages): %s" % x)
return ps
# 2. Try exact match in provides:
ps = find_package_from_provides(x, provides, packages)
if ps:
logging.debug("Packages provide %s (provides): %s" % (x, ps))
return ps
# 3. Try fuzzy (! exact) match in filelists:
ps = find_package_from_filelists(x, filelists, packages, False)
if not ps:
logging.debug("No package provides " + x)
return [x]
logging.debug(
"Packages provide %s (filelists, fuzzy match): %s" % (x, ps)
)
return ps
# It requires a lot of RAM if memoized, so disabled until I think of another
# better implementations.
# find_providing_packages = memoize(_find_providing_packages)
find_providing_packages = _find_providing_packages
def init_repodata(repodir, packages=[], resolve=False):
"""
:param repodir: Repository dir holding repodata/
:param packages: Reference list of packages, e.g. install rpm names
:param resolv: Resolv object to package names if true
"""
files = dict((t, find_xml_file(repodir, t)) for t in REPODATA_XMLS)
groups = get_package_groups(files[REPODATA_COMPS])
reqs_and_provs = get_package_requires_and_provides(files[REPODATA_PRIMARY])
filelists = get_package_files(files[REPODATA_FILELISTS])
if not packages:
packages = uniq(p for p, _ in filelists)
requires = concat(izip(repeat(p), rs) for p, rs
in [itemgetter(0, 1)(t) for t
in reqs_and_provs if t[0] in packages])
provides = concat(izip(repeat(p), prs) for p, prs
in [itemgetter(0, 2)(t) for t
in reqs_and_provs if t[0] in packages])
if resolve:
requires = [(p, find_providing_packages(r, provides, filelists,
packages)) for p, r
in requires]
groups = [(g, find_all_requires(ps, requires, packages, []))
for g, ps in groups]
return (groups, filelists, requires, provides, packages)
def path_to_id(p):
return md5(p).hexdigest()
def _repooutdir(topdir, repodir):
repoid = path_to_id(os.path.abspath(repodir.rstrip(os.path)))
return os.path.join(os.path.abspath(os.path.normpath(topdir)), repoid)
def datapath(outdir, name="repodata.json"):
return os.path.join(outdir, name)
def _find_requires(x, requires, packages, exceptions=[]):
return uconcat([r for r in rs if r != x and r
in packages and r not in exceptions] for p, rs
in requires if p == x)
# find_requires = memoize(_find_requires)
find_requires = _find_requires
def find_all_requires(xs, requires, packages, acc):
while True:
rs = uconcat(find_requires(x, requires, packages, acc) for x in xs)
if not rs:
return sorted(acc) # all requires found.
xs = rs
acc += rs
def parse_and_dump_repodata(repodir, outdir=None):
if not outdir:
outdir = _repooutdir(select_topdir(), repodir)
data = {}
keys = REPODATA_NAMES
data = dict(zip(keys, init_repodata(repodir, resolve=True)))
if not os.path.exists(outdir):
os.makedirs(outdir)
json.dump(data, open(datapath(outdir), "w"))
def load_dumped_repodata(outdir=None, repodir=None):
if not outdir:
assert repodir is not None, \
"repodir parameter needs to be set if outdir is None"
outdir = _repooutdir(select_topdir(), repodir)
datafile = datapath(outdir)
if not os.path.exists(datafile):
raise RuntimeError("Target data dumped not found: " + datafile)
return json.load(open(datafile))
def show_xs(xs, head="# "):
try:
width = int(os.environ['COLUMNS'])
except (KeyError, ValueError):
width = 80
width -= 2
text = head + ", ".join(xs)
for l in textwrap.wrap(text, width, subsequent_indent=' '):
print l
# APIs:
def list_requires(data, package):
"""
:param data: Data object load_dumped_repodata() returns
:param package: Package name to search required packages
"""
return uconcat((rs for p, rs in data["requires"] if p == package))
def list_whatrequires(data, package):
"""
:param data: Data object load_dumped_repodata() returns
:param package: Package name to search required packages
"""
return uniq((p for p, rs in data["requires"] if package in rs))
def list_provides(data, package):
"""
:param data: Data object load_dumped_repodata() returns
:param package: Package name to search required packages
"""
return [pr for p, pr in data["provides"] if p == package]
def list_files(data, package):
"""
:param data: Data object load_dumped_repodata() returns
:param package: Package name to search required packages
"""
return [f for p, f in data["filelists"] if p == package]
_COMMANDS = (_CMD_INIT, _CMD_SEARCH) = ("init", "search")
def option_parser():
defaults = dict(datadir=None, requires=None, provides=None, files=None,
groups=False, whatrequires=None, outdir=None,
verbose=False)
p = optparse.OptionParser("""\
%prog CMD [OPTION ...]
Commands:
i[nit] Initialize repodata cache for given repodata/ dir
s[earch] [PACKAGE ...] Search packages/filelists/etc for given package[s]
Examples:
# Initialize repodata cache for rhel-6-3-x86_64:
%prog i -d /net/binaries/contents/RHEL/6/3/x86_64/default/Server \\
-o /var/lib/rpmkit/repodata/rhel/6/3/x86_64/Server
# Search packages require 'bash' in rhel-6-3-x86_64:
%prog s -d /var/lib/rpmkit/repodata/rhel/6/3/x86_64/Server \\
--whatrequires bash
""")
p.set_defaults(**defaults)
p.add_option("-d", "--datadir", help="Specify repodata dir")
iog = optparse.OptionGroup(p, "'init' options")
iog.add_option("-o", "--outdir", help="Output dir")
p.add_option_group(iog)
sog = optparse.OptionGroup(p, "'search' options")
sog.add_option('', "--requires",
help="List packages on which this package depends")
sog.add_option('', "--provides",
help="List capabilities on which this package provides")
sog.add_option('', "--files", help="List all the files in this package")
sog.add_option('', "--groups", action="store_true", help="List groups")
sog.add_option('', "--whatrequires",
help="List packages depends on this package")
p.add_option_group(sog)
p.add_option("-v", "--verbose", action="store_true", help="Verbose mode")
return p
def main():
p = option_parser()
(options, args) = p.parse_args()
logging.getLogger().setLevel(DEBUG if options.verbose else INFO)
if not args:
p.print_usage()
sys.exit(1)
if args[0].startswith('i'):
cmd = _CMD_INIT
elif args[0].startswith('s'):
cmd = _CMD_SEARCH
else:
p.print_usage()
sys.exit(1)
if not options.datadir:
options.datadir = raw_input("Repodata dir? > ")
if cmd == _CMD_INIT:
parse_and_dump_repodata(options.datadir, options.outdir)
else:
data = load_dumped_repodata(options.datadir)
if options.groups:
for g, ps in data["groups"]:
print "@" + g
show_xs(ps, "# pakcages = ")
sys.exit(0)
if options.requires:
f = list_requires
p = options.requires
elif options.whatrequires:
f = list_whatrequires
p = options.whatrequires
elif options.provides:
f = list_provides
p = options.provides
elif options.files:
f = list_files
p = options.files
else:
print >> sys.stderr, \
"You must specify one of options for 'search' command"
sys.exit(1)
for r in f(data, p):
print r
if __name__ == "__main__":
main()
# vim:sw=4:ts=4:et:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment