Skip to content

Instantly share code, notes, and snippets.

@JacobCallahan
Created March 14, 2018 13:55
Show Gist options
  • Save JacobCallahan/0132e99c7b00e695643f06b84722bfde to your computer and use it in GitHub Desktop.
Save JacobCallahan/0132e99c7b00e695643f06b84722bfde to your computer and use it in GitHub Desktop.
asynchronously gather bugzilla information
import os
import asyncio
import sys
try:
import bugzilla
except ImportError:
print('Install dependencies with `pip install -r -requirements.txt`.')
sys.exit(-1)
URL = (
'https://bugzilla.redhat.com/buglist.cgi?'
'bug_status={}'
'&classification=Red%20Hat'
'&f1=component'
'&o1=notsubstring'
'&v1=Doc'
'&f2=flagtypes.name'
'&o2=equals'
'&v2={}'
'&product=Red%20Hat%20Satellite%206'
'&query_format=advanced'
)
# List of flags we care
FLAGS = [
'sat-6.4.0?',
# 'sat-6.4.0+',
# 'sat-backlog?',
'sat-backlog+',
]
STATUSES = [
'NEW',
'ASSIGNED',
'POST',
# 'MODIFIED',
# 'ON_DEV',
# 'ON_QA',
]
BUGZILLA_URL = "https://bugzilla.redhat.com/xmlrpc.cgi"
user = os.environ.get('BUGZILLA_USER_NAME', None)
password = os.environ.get('BUGZILLA_USER_PASSWORD', None)
bugzilla = bugzilla.Bugzilla(url=BUGZILLA_URL)
session = bugzilla.login(user=user, password=password)
def table_format(in_data):
"""Expecting a list of dicts
[flag, {status: result}, ...]
return a dict
{flag, {status: result}, ...}
"""
compiled = {}
for flag, data in in_data:
if not compiled.get(flag, None):
compiled[flag] = data
else:
compiled[flag].update(data)
return compiled
def bugzilla_query(status, flag):
return len(bugzilla.query(
bugzilla.url_to_query(URL.format(status, flag))
))
async def _mostly_async_get(loop, status, flag):
"""This can run a non-async method"""
result = await loop.run_in_executor(
None, bugzilla_query, status, flag
)
return flag, {status: result}
async def _async_loop(loop):
"""asynchronously visit each status and flag, and store results"""
tasks = []
table = {}
for flag in FLAGS:
table[flag] = {}
for status in STATUSES:
tasks.append(
asyncio.ensure_future(_mostly_async_get(loop, status, flag))
)
results = await asyncio.gather(*tasks)
print(table_format(results))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(_async_loop(loop))
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment