Last active
May 12, 2016 19:18
-
-
Save wolsen/0c8414a1b6cbb16e992e to your computer and use it in GitHub Desktop.
rmadison + cloud-archive madison
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Provides a rather basic version of rmadison (or dak ls if you prefer) | |
# for the Ubuntu cloud-archive. | |
# | |
# This script works in the following manner: | |
# 1) It will show the rmadison output for the selected package to show | |
# the values of packages within the main ubuntu archives | |
# 2) It will show similar output for the selected package in the ubuntu | |
# cloud archives. | |
# | |
from lxml import etree | |
import collections | |
import gzip | |
import logging as log | |
import os.path | |
import shutil | |
import subprocess | |
import sys | |
import tempfile | |
import urllib2 | |
# Defines teh default ubuntu cloud-archive repository URL. | |
UCA_DEB_REPO_URL = "http://ubuntu-cloud.archive.canonical.com/ubuntu/dists" | |
# This is where the Sources.gz files will be downloaded to. | |
# In the future, it'd be better to have these cached and know - but | |
# I'll /assume/ bandwidth is decent enough its not a super big issue. | |
working_dir = tempfile.mkdtemp() | |
def get_files_in_remote_url(relative_path=""): | |
""" | |
Returns a list of files found in the remote URL specified. | |
This is heavily dependent upon being able to browse the folders over | |
http as the ubuntu cloud archives are. If that changes, then this | |
script needs to be revisited. | |
:relative_path: a path relative to the UCA_DEB_REPO_URL | |
:return: list of files or folders found in the remote url. | |
""" | |
url = "%s/%s" % (UCA_DEB_REPO_URL, relative_path) | |
content = urllib2.urlopen(url) | |
root = etree.parse(content, etree.HTMLParser()) | |
# Content available here should be directory listing, which is presented | |
# as a table, with each file in its own row. Use xpath expression to find | |
# the values of the text within the table columns. | |
files = [] | |
for f in root.xpath('//*/td/*/text()'): | |
# Skip the canonical parent directory nav link | |
if f == 'Parent Directory': | |
continue | |
if f.endswith('/'): | |
f = f[:-1] | |
files.append(f) | |
log.debug("Found files at %s: %s", url, files) | |
return files | |
def get_available_dists(): | |
""" | |
Returns the list of distributions which are available. | |
""" | |
# Each folder maps to a dist | |
dists = [] | |
for folder in get_files_in_remote_url(): | |
# Skip -proposed packages for now as well. This should probably be | |
# a command line script. | |
if folder.endswith('-proposed'): | |
log.debug('Skipping folder %s' % folder) | |
continue | |
dists.append(folder) | |
return dists | |
def get_openstack_releases(dist): | |
""" | |
Returns a list of available OpenStack releases for the specified | |
distribution. | |
:param dist: the distribution to retrieve openstack releases for. | |
""" | |
os_releases = get_files_in_remote_url(dist) | |
log.debug("Found OpenStack releases for dist %s: %s", dist, os_releases) | |
return os_releases | |
class Sources(object): | |
def __init__(self, dist, os_release): | |
""" | |
Creates a new Sources which represents the Sources.gz file | |
for the source folder in the specified distro and OpenStack | |
release. | |
:param dist: the Ubuntu distribution | |
:param os_release: the OepnStack release | |
""" | |
fname = '%s_%s_Sources.gz' % (dist, os_release) | |
self.dist = dist | |
self.os_release = os_release | |
self.fname = os.path.join(working_dir, fname) | |
self.download() | |
def download(self): | |
""" | |
Downloads the file to parse Source information from. | |
""" | |
url = ("%(base_url)s/%(dist)s/%(os_release)s/main/source/Sources.gz" % | |
{'base_url': UCA_DEB_REPO_URL, | |
'dist': self.dist, | |
'os_release': self.os_release}) | |
content = urllib2.urlopen(url) | |
with open(self.fname, 'wb+') as f: | |
f.write(content.read()) | |
def get_sources(self): | |
""" | |
A generator returning the Source package descriptors | |
found in the Sources.gz file supplied. | |
:param filename: the file to read the source packages from. | |
""" | |
lines = [] | |
for line in gzip.open(self.fname): | |
# Empty line is the end of the source package stanza | |
if line.strip() == '': | |
src = Source.parse(''.join(lines)) | |
lines = [] | |
yield src | |
else: | |
lines.append(line) | |
class Source(dict): | |
@property | |
def package(self): | |
return self['Package'] | |
@property | |
def binaries(self): | |
binary_as_str = self['Binary'] | |
return binary_as_str.split(', ') | |
@property | |
def version(self): | |
return self['Version'] | |
@property | |
def architecture(self): | |
return self['Architecture'] | |
@classmethod | |
def parse(cls, text): | |
""" | |
Parses basic content from the Sources.gz file in a debian archive for | |
retrieving basic information. | |
:param text: the text to parse | |
""" | |
src = Source() | |
lines = text.split('\n') | |
key = None | |
for line in lines: | |
if line.startswith(' '): | |
# Continuation from the previous line | |
src[key] = src[key] + line | |
else: | |
parts = line.split(': ') | |
key = parts[0] | |
value = ':'.join(parts[1:]) | |
src[key] = value | |
return src | |
def print_table(table): | |
""" | |
Prints the table in a nice formatted output. | |
:param table: a table in a traditional representation | |
(a list of lists) | |
""" | |
widths = [max(len(x) for x in col) for col in zip(*table)] | |
for row in table: | |
out = " | ".join("{:{}}".format(x, widths[i]) | |
for i, x in enumerate(row)) | |
print " " + out | |
def do_rmadison_search(search_for): | |
""" | |
Runs the earch for the packages using rmadison. | |
""" | |
matches = [] | |
try: | |
cmd = ['rmadison'] + search_for | |
output = subprocess.check_output(cmd) | |
lines = output.split('\n') | |
for line in lines: | |
match = [x.strip() for x in line.split(' |')] | |
matches.append(match) | |
except Exception as e: | |
log.error("Error querying rmadison: %s", str(e)) | |
if len(matches) > 0: | |
return matches[:-1] | |
else: | |
return matches | |
def do_search(): | |
""" | |
Runs the search for packages in the cloud archive. | |
""" | |
dists = get_available_dists() | |
mapping = {} | |
for d in dists: | |
os_releases = get_openstack_releases(d) | |
mapping[d] = os_releases | |
search_for = sys.argv[1:] | |
matches = [] | |
for dist, os_releases in mapping.items(): | |
for os_release in os_releases: | |
for src in Sources(dist, os_release).get_sources(): | |
for pkg in search_for: | |
mtype = '' | |
if src.package == pkg: | |
mtype = 'source' | |
elif pkg in src.binaries: | |
mtype = src.architecture | |
else: | |
# Not a match, continue | |
continue | |
match = [pkg, | |
src.version, | |
'cloud-archive:%s' % os_release, | |
mtype] | |
matches.append(match) | |
rmadison_results = do_rmadison_search(search_for) | |
matches = matches + rmadison_results | |
print_table(sorted(matches, key=lambda row: row[0])) | |
if __name__ == '__main__': | |
try: | |
if len(sys.argv) < 2: | |
print "E: need at least one package name as an argument." | |
exit(1) | |
else: | |
do_search() | |
finally: | |
shutil.rmtree(working_dir) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment