Created
July 14, 2021 21:10
-
-
Save bentito/7a211093092bfcc34773627a0a54152e to your computer and use it in GitHub Desktop.
Script for Python 3.z to check indexes for problematic comma-separated values
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requires | |
# OPM, operator-sdk v1.8.0 or higher | |
# grpcurl, podman or docker, skopeo | |
import os | |
import json | |
import sqlite3 | |
import subprocess | |
CONTAINER_TOOL = "docker" | |
def run_cmd(cmd, params=None): | |
if not params: | |
params = {} | |
params.setdefault('universal_newlines', True) | |
params.setdefault('encoding', 'utf-8') | |
params.setdefault('stderr', subprocess.PIPE) | |
params.setdefault('stdout', subprocess.PIPE) | |
response = subprocess.run(cmd, **params) | |
return response.stdout | |
def _copy_files_from_image(image, src_path, dest_path): | |
run_cmd([CONTAINER_TOOL, 'pull', image]) | |
container_command = 'unused' | |
container_id = run_cmd([CONTAINER_TOOL, 'create', image, container_command]).strip() | |
run_cmd([CONTAINER_TOOL, 'cp', f'{container_id}:{src_path}', dest_path]) | |
def skopeo_inspect(*args): | |
cmd = ['skopeo', 'inspect'] + list(args) | |
output = run_cmd(cmd) | |
return json.loads(output) | |
def get_image_label(pull_spec, label): | |
""" | |
Get the labels from the image. | |
:param list<str> labels: the labels to get | |
:return: the dictionary of the labels on the image | |
:rtype: dict | |
""" | |
if pull_spec.startswith('docker://'): | |
full_pull_spec = pull_spec | |
else: | |
full_pull_spec = f'docker://{pull_spec}' | |
return skopeo_inspect(full_pull_spec, '--config', '--override-os', 'linux').get('config', {}).get('Labels', {}).get( | |
label) | |
def _get_index_database(from_index, base_dir): | |
db_path = get_image_label(from_index, 'operators.operatorframework.io.index.database.v1') | |
_copy_files_from_image(from_index, db_path, base_dir) | |
return os.path.join(base_dir, os.path.basename(db_path)) | |
con.close() | |
def _add_property_to_index(db_path, property): | |
""" | |
Add a property to the index | |
:param str base_dir: the base directory where the database and index.Dockerfile are created | |
:param str bundle: the bundle path | |
:param dict property: a dict representing a property to be added to the index.db | |
:raises IIBError: if the sql insertion fails | |
""" | |
insert = "INSERT INTO properties (type, value, operatorbundle_name, operatorbundle_version, operatorbundle_path) \ | |
VALUES (?, ?, ?, ?, ?);" | |
con = sqlite3.connect(db_path) | |
# Insert property | |
con.execute(insert, ( | |
property["type"], | |
property["value"], | |
property["operatorbundle_name"], | |
property["operatorbundle_version"], | |
property["operatorbundle_path"])) | |
con.commit() | |
con.close() | |
def _get_head_bundles_from_index(db_path): | |
con = sqlite3.connect(db_path) | |
cur = con.cursor() | |
select = """\ | |
SELECT | |
e.package_name, | |
e.channel_name, | |
e.operatorbundle_name, | |
e.depth, | |
b.version, | |
b.bundlepath, | |
b.skips, | |
r.operatorbundle_name, | |
c.head_operatorbundle_name, | |
p.default_channel | |
FROM | |
channel_entry e | |
LEFT JOIN | |
channel_entry r ON r.entry_id = e.replaces | |
LEFT JOIN | |
operatorbundle b ON e.operatorbundle_name = b.name | |
LEFT JOIN | |
channel c ON e.package_name = c.package_name AND e.channel_name = c.name | |
LEFT JOIN | |
package p on c.package_name = p.name;""" | |
cur.execute(select) | |
rows = cur.fetchall() | |
only_head_rows = [] | |
for row in rows: | |
if row[2] == row[8]: | |
only_head_rows.append(row) | |
con.commit() | |
con.close() | |
return only_head_rows | |
def check_comma_sep_usage(from_index, base_dir): | |
db_path = _get_index_database(from_index, base_dir) | |
head_bundles = _get_head_bundles_from_index(db_path) | |
pkg_bundle_labels = [] | |
for head_bundle in head_bundles: | |
pkg_bundle_labels.append({'pkg_name': head_bundle[0], 'bundle_name': head_bundle[2], 'versions_label': get_image_label(head_bundle[5], "com.redhat.openshift.versions")}) | |
count = 0 | |
for pkg_bundle_label in pkg_bundle_labels: | |
versions = pkg_bundle_label['versions_label'] | |
if "," in versions: | |
cleaned_versions = ''.join([c for c in versions if c not in [' ', '\t', '\n']]) | |
if cleaned_versions != "v4.5,v4.6": | |
if cleaned_versions != "v4.6,v4.5": | |
print (pkg_bundle_label) | |
count = count + 1 | |
print("{count} affected bundles".format(**locals())) | |
if __name__ == '__main__': | |
# Change this variable for other indexes | |
INDEX_IMAGE = "registry.redhat.io/redhat/redhat-operator-index:v4.8" | |
BASE_DIR = "./" | |
check_comma_sep_usage(INDEX_IMAGE, BASE_DIR) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment