Created
April 2, 2026 14:18
-
-
Save agraul/a328f3e2be92948a2d2dc86af72c02f7 to your computer and use it in GitHub Desktop.
Python script to deduplicate rhnPackageName
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| """Deduplicate rhnPackageName name entries | |
| Steps: | |
| 1. Update all references to refer to a single entry | |
| 2. Delete unreferenced entries | |
| 3. Add unique index to verify uniqueness and persist it | |
| Steps 1 and 2 are repeated for all duplicate name entries, then step 3 is executed. | |
| Requires psycopg >= 3.2 | |
| """ | |
| import argparse | |
| import logging | |
| import sys | |
| import textwrap | |
| import psycopg | |
| import psycopg.rows | |
| from psycopg import sql | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| stream=sys.stderr, | |
| ) | |
| RELS = ( | |
| ("rhnactionpackage", "name_id"), | |
| ("rhnactionpackageremovalfailure", "name_id"), | |
| ("rhnactionpackageremovalfailure", "suggested"), | |
| ("rhnchannelnewestpackage", "name_id"), | |
| ("rhnkickstartpackage", "package_name_id"), | |
| ("rhnlockedpackages", "name_id"), | |
| ("rhnpackage", "name_id"), | |
| ("rhnpackagenevra", "name_id"), | |
| ("rhnpackagesyncblacklist", "package_name_id"), | |
| ("rhnregtokenpackages", "name_id"), | |
| ("rhnserveractionverifymissing", "package_name_id"), | |
| ("rhnserveractionverifyresult", "package_name_id"), | |
| ("rhnserverpackage", "name_id"), | |
| ("rhnserverprofilepackage", "name_id"), | |
| ("rhntransactionpackage", "name_id"), | |
| ("rhnversioninfo", "name_id"), | |
| ("suseimageinfopackage", "name_id"), | |
| ("susepackagestate", "name_id"), | |
| ) | |
| def package_name_ids(conn: psycopg.Connection, name: str) -> list[int]: | |
| with conn.cursor(row_factory=psycopg.rows.scalar_row) as cur: | |
| query = ( | |
| "SELECT pn.id FROM rhnpackagename pn WHERE pn.name = %s ORDER BY pn.id ASC" | |
| ) | |
| logger.debug(query) | |
| return cur.execute(query, (name,)).fetchall() | |
| def update_references(conn: psycopg.Connection, pn_ids: list[int]): | |
| # nothing to update | |
| if len(pn_ids) <= 1: | |
| return | |
| first, rest = pn_ids[0], pn_ids[1:] | |
| with conn.cursor() as cur: | |
| for table, column in RELS: | |
| query = sql.SQL("UPDATE {tbl} SET {col} = %s WHERE {col} = ANY(%s)").format( | |
| tbl=sql.Identifier(table), col=sql.Identifier(column) | |
| ) | |
| logger.debug(query.as_string()) | |
| logger.debug("values: %s", (first, rest)) | |
| cur.execute(query, (first, rest)) | |
| def delete_duplicates(conn: psycopg.Connection, pn_ids: list[int]): | |
| # nothing to delete | |
| if len(pn_ids) <= 1: | |
| return | |
| rest = [(id_,) for id_ in pn_ids[1:]] | |
| with conn.cursor() as cur: | |
| query = "DELETE FROM rhnPackageName pn WHERE pn.id = %s" | |
| logger.debug(query) | |
| logger.debug("values: %s", rest) | |
| cur.executemany(query, rest) | |
| def re_index(conn: psycopg.Connection): | |
| with conn.cursor() as cur: | |
| try: | |
| query = "CREATE UNIQUE INDEX rhn_pn_name_uq ON rhnPackageName (name)" | |
| cur.execute(query) | |
| logger.info("UNIQUE INDEX for rhnPackageName (name) added.") | |
| except psycopg.errors.DuplicateTable: | |
| logger.info("UNIQUE INDEX for rhnPackageName (name) already exists") | |
| def argparser(): | |
| parser = argparse.ArgumentParser("dedup-rhnpackagename") | |
| parser.add_argument("packagenames", nargs="+") | |
| parser.add_argument( | |
| "--db-host", help="Database host (hostname or IP address)", required=True | |
| ) | |
| parser.add_argument("--db-port", help="Database port", default="5432") | |
| parser.add_argument( | |
| "--db-password", help="Password for database user spacewalk", required=True | |
| ) | |
| parser.add_argument("--db-name", help="Database name", default="susemanager") | |
| parser.add_argument("--db-user", help="Database user") | |
| parser.add_argument("-v", "--verbose", action="count", default=0) | |
| return parser | |
| def main(): | |
| parser = argparser() | |
| args = parser.parse_args() | |
| if args.verbose == 1: | |
| logger.setLevel(logging.DEBUG) | |
| elif args.verbose == 2: | |
| logger.setLevel(logging.DEBUG) | |
| logging.getLogger("psycopg").setLevel(logging.DEBUG) | |
| connstring = ( | |
| f"host={args.db_host} port={args.db_port} dbname={args.db_name}" | |
| f" user={args.db_user} password={args.db_password}" | |
| ) | |
| with psycopg.connect(connstring) as conn: | |
| logger.info("De-duplicating package name entries for: %s", args.packagenames) | |
| for pname in args.packagenames: | |
| name_ids = package_name_ids(conn, pname) | |
| if len(name_ids) == 1: | |
| logger.info("Nothing to do for '%s'", pname) | |
| continue | |
| logger.debug("'%s' ids: %s", pname, name_ids) | |
| update_references(conn, name_ids) | |
| delete_duplicates(conn, name_ids) | |
| re_index(conn) | |
| logger.info("De-duplication finished.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment