Created
July 30, 2014 13:43
-
-
Save ssato/f674e9e4cf19a95e5d50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# List errata with using RHN API. | |
# | |
# Copyright (C) 2012 Red Hat, Inc. | |
# Red Hat Author(s): Satoru SATOH <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
from rpmkit.swapi import main as swmain | |
from rpmkit.utils import groupby_key | |
from operator import itemgetter | |
import calendar | |
import datetime | |
import logging | |
import optparse | |
import os | |
import os.path | |
import pprint | |
import re | |
import sys | |
try: | |
import json | |
except ImportError: | |
try: | |
import simplejson as json | |
except ImportError: | |
json = object() | |
def __dump(obj, fp, **kwargs): | |
pprint.pprint(obj, fp) | |
# Looks almost same and may become alternative. | |
setattr(json, "dump", __dump) | |
import matplotlib | |
matplotlib.use("Agg") | |
import matplotlib.pyplot as pyplot | |
_DATE_REG = re.compile(r'\d{4}-\d{2}-\d{2}(?:\s+(?:\d{2}:\d{2}:\d{2})?)?') | |
_TIME_RESOLUTIONS = (_DAY, _MONTH, _YEAR) = (0, 1, 2) | |
_TIME_RESOLUTION_S = dict(day=_DAY, month=_MONTH, year=_YEAR) | |
def _simple_fmt(key, es): | |
""" | |
:param key: key = (year [, month [, day]]) | |
e.g. "2012" (year), "2012-09" (month) and "2012-09-01" (day). | |
:param es: errata list :: [errata :: dict] | |
""" | |
return "# %s\n%s" % (str(key), '\n'.join(e["advisory"] for e in es)) | |
def _simple_dump(result, fp=sys.stdout): | |
""" | |
:param result: [(key, [errata])] | |
""" | |
for k, es in result: | |
print >> fp, _simple_fmt(k, es) | |
def _json_dump(result, fp): | |
return json.dump(result, fp, indent=2) | |
_FORMAT_TYPES = (_SIMPLE_FMT, _JSON_FMT) = ("simple", "json") | |
_FORMAT_MAP = dict(simple=_simple_fmt, json=_json_dump) | |
def __swapi(cmd): | |
""" | |
swapi (SpaceWalk API library) wrapper. | |
:param cmd: command args :: [str] | |
""" | |
return swmain(cmd)[0] | |
def validate_datetime(t, reg=_DATE_REG): | |
""" | |
:param t: String represents date and time, e.g. '2012-10-09 08:00:00' | |
FIXME: This is too naive implementation. | |
""" | |
return reg.match(t) is not None | |
def shorten(dic, prefix): | |
return dict((k.replace(prefix, '').lower(), v) for k, v in dic.iteritems()) | |
def list_errata(channel, start=None, end=None, offline=False): | |
""" | |
:param channel: Software channel label to list errata | |
:param start: Start date | |
:param end: End date. Error if `end` is given but `start` is not. | |
""" | |
if start: | |
assert validate_datetime(start), \ | |
"list_errata(): Invalid data for `start` parameter: '%s'" % start | |
if end is not None: | |
assert start is not None, \ | |
"list_errata(): `end` paramter requires `start` parameter is set" | |
assert validate_datetime(end), \ | |
"list_errata(): Invalid data for `end` parameter: '%s'" % end | |
args = [ | |
"--cacheonly" if offline else None, | |
"-A", ','.join(a for a in (channel, start, end) if a is not None), | |
"channel.software.listErrata" | |
] | |
return [shorten(e, "errata_") for e in | |
__swapi([a for a in args if a is not None])] | |
_ERRATA_TYPES_MAP = dict(SA="RHSA", BA="RHBA", EA="RHEA") | |
def classify_errata(errata): | |
"""Classify errata by its type, RH(SA|BA|EA). | |
:param errata: errata | |
>>> assert classify_errata(dict(advisory="RHSA-2012:1236")) == "RHSA" | |
>>> assert classify_errata(dict(advisory="RHBA-2012:1224")) == "RHBA" | |
>>> assert classify_errata(dict(advisory="RHEA-2012:0226")) == "RHEA" | |
""" | |
return _ERRATA_TYPES_MAP[errata["advisory"][2:4]] | |
def classify_errata_list(errata): | |
""" | |
Classify `errata` list by each type, RH(SA|BA|EA). | |
:param errata: [errata] | |
""" | |
return list(groupby_key(errata, classify_errata)) | |
def __keyfunc(resolution): | |
""" | |
>>> e = {"issue_date": "2012-09-03 13:00:00"} | |
>>> __keyfunc(_DAY)(e) | |
'2012-09-03' | |
>>> __keyfunc(_MONTH)(e) | |
'2012-09' | |
>>> __keyfunc(_YEAR)(e) | |
'2012' | |
""" | |
def f(errata): | |
try: | |
return errata["issue_date"].split()[0].rsplit('-', resolution)[0] | |
except (IndexError, TypeError): | |
raise RuntimeError("errata was '%s'" % str(errata)) | |
return f | |
def div_errata_list_by_time_resolution(es, resolution=_DAY): | |
""" | |
Divides given errata list (gotten by `list_errata` defined above) by given | |
time period (`resolution`), and returns list of list of errata. | |
:param es: Errata list | |
:param resolution: Time resolution | |
:return: [(resolution, [errata])] sorted by resolution | |
""" | |
return sorted((([int(i) for i in k.split('-')], v) for k, v | |
in groupby_key(es, __keyfunc(resolution))), | |
key=itemgetter(0)) | |
def barchart(title, xlabel, ylabel, dataset, output, | |
xtick_labels=(), ytick_labels=(), *args, **kwargs): | |
""" | |
:param title: Title of the chart | |
:param xlabel: Label of X axis of the chart | |
:param ylabel: Label of Y axis of the chart | |
:param dataset: 2D-array dataset :: [(x, y)] | |
:param output: Output filename | |
""" | |
logging.debug("dataset=" + str(dataset)) | |
xs = [x for x, _ in dataset] | |
ys = [y for _, y in dataset] | |
fig = pyplot.figure() | |
pyplot.title(title) | |
pyplot.xlabel(xlabel) | |
pyplot.ylabel(ylabel) | |
if xtick_labels: | |
pyplot.xticks( | |
range(len(xtick_labels)), xtick_labels, rotation='vertical' | |
) | |
if ytick_labels: | |
pyplot.yticks(range(len(ytick_labels)), ytick_labels) | |
pyplot.bar(xs, ys) | |
fig.savefig(output, format="png") | |
def __ymd_indices(key, vals): | |
""" | |
:param key: Grouping key for `errata` list | |
:param vals: [Key_value], e.g. ["2012-09", "2012-10", "2013-02"] | |
""" | |
if key == _YEAR: | |
for i in range(int(min(vals)), int(max(vals)) + 1): | |
yield str(i) | |
elif key == _MONTH: | |
def fmt(y, m): | |
return "%d-%d" % (y, m) | |
(y0, m0) = [int(i) for i in min(vals).split('-')] | |
(y1, m1) = [int(i) for i in max(vals).split('-')] | |
mend = m1 if y0 == y1 else 12 | |
for m in range(m0, mend + 1): # months in year `y0` | |
yield fmt(y0, m) | |
if y0 != y1: | |
for y in range(y0 + 1, y1 + 1): # rest of years | |
mend = m1 if y == y1 else 12 | |
for m in range(1, mend + 1): | |
yield fmt(y, m) | |
elif key == _DAY: | |
def fmt(y, m, d): | |
return "%d-%d-%d" % (y, m, d) | |
(y0, m0, d0) = [int(i) for i in min(vals).split('-')] | |
(y1, m1, d1) = [int(i) for i in max(vals).split('-')] | |
mend = m1 if y0 == y1 else 12 | |
for m in range(m0, mend + 1): # months in year `y0` | |
dend = d1 if y == y1 and m == mend else calendar.mdays[m] | |
for d in range(d0, dend + 1): | |
yield fmt(y0, m, d) | |
if y0 != y1: | |
for y in range(y0 + 1, y1 + 1): # rest of years | |
mend = m1 if y == y1 else 12 | |
for m in range(1, mend + 1): | |
dend = d1 if y == y1 and m == mend else calendar.mdays[m] | |
for d in range(1, dend + 1): | |
yield fmt(y, m, d) | |
def errata_barchart_by_key(errata, key, output): | |
""" | |
:param errata: [(key, [errata])] where key = (YY [,MM [,DD]]) | |
:param key: Grouping key for `errata` list | |
:param output: Output filepath | |
""" | |
def ymd2i(ymd): | |
if len(ymd) == 3: | |
return datetime.date(*ymd).toordinal() | |
elif len(ymd) == 2: | |
return datetime.date(ymd[0], ymd[1], 1).toordinal() | |
elif len(ymd) == 1: | |
return datetime.date(ymd[0], 1, 1).toordinal() | |
else: | |
raise RuntimeError("Wrong ymd: " + str(ymd)) | |
args = ( | |
"Number of errata by " + key, | |
"Time period", | |
"Number of errata", | |
[(ymd2i(k), len(es)) for k, es in errata], | |
output, | |
[str(k) for k, _es in errata], | |
) | |
barchart(*args) | |
def init_log(level): | |
lvl = [logging.DEBUG, logging.INFO, logging.WARN][level] | |
logging.basicConfig(format="[%(levelname)s] %(message)s", level=lvl) | |
def option_parser(): | |
p = optparse.OptionParser("%prog [OPTION ...] SW_CHANNEL_LABEL") | |
date = datetime.datetime.now().strftime("%Y%m%d") | |
defaults = dict(outdir="list-errata-" + date, resolution="day", | |
start=None, end=None, verbose=1, | |
format=_JSON_FMT, dumpcharts=False) | |
p.set_defaults(**defaults) | |
p.add_option("-o", "--outdir", help="Specify output dir [%default]") | |
p.add_option("-r", "--resolution", type="choice", | |
choices=_TIME_RESOLUTION_S.keys(), | |
help="Specify time resolution to group errata " | |
"list [%default]") | |
p.add_option("-s", "--start", | |
help="Specify start date to list errata from") | |
p.add_option("-e", "--end", help="Specify end date to list errata to") | |
p.add_option("-F", "--format", type="choice", choices=_FORMAT_MAP.keys(), | |
help="Specify format type for outputs [%default]") | |
p.add_option("-d", "--dumpcharts", action="store_true", | |
help="File path to dump errata count charts if given") | |
p.add_option("-D", "--debug", action="store_const", const=0, | |
dest="verbose", help="Debug mode") | |
p.add_option("-q", "--quiet", action="store_const", const=2, | |
dest="verbose", help="Quiet mode") | |
return p | |
def __errata_file(etype, outdir, ext=".dat"): | |
return os.path.join(outdir, etype.lower() + "-errata" + ext) | |
def main(argv=sys.argv): | |
p = option_parser() | |
(options, args) = p.parse_args(argv[1:]) | |
if not args: | |
p.print_help() | |
print >> sys.stderr, \ | |
"\nTry `swapi channel.listSoftwareChannels` to get sw channels" | |
sys.exit(0) | |
init_log(options.verbose) | |
channel = args[0] | |
resolution = _TIME_RESOLUTION_S.get(options.resolution, "day") | |
es = list_errata(channel, options.start, options.end) | |
es_by_types = classify_errata_list(es) | |
fmtr = _FORMAT_MAP.get(options.format, _JSON_FMT) | |
if not os.path.exists(options.outdir): | |
os.makedirs(options.outdir) | |
# all: | |
res = div_errata_list_by_time_resolution(es, resolution) | |
with open(__errata_file("all", options.outdir), 'w') as f: | |
fmtr(res, f) | |
if options.dumpcharts: | |
with open(__errata_file("all", options.outdir, ".png"), 'w') as f: | |
errata_barchart_by_key(res, options.resolution, f) | |
for etype, es in es_by_types: | |
res = div_errata_list_by_time_resolution(es, resolution) | |
with open(__errata_file(etype, options.outdir), 'w') as f: | |
fmtr(res, f) | |
if options.dumpcharts: | |
with open(__errata_file(etype, options.outdir, ".png"), 'w') as f: | |
errata_barchart_by_key(res, options.resolution, f) | |
if __name__ == '__main__': | |
main(sys.argv) | |
# vim:sw=4:ts=4:et: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment