Created
October 21, 2025 11:44
-
-
Save apinter/c1a76ed99841d947a662e3b476acff88 to your computer and use it in GitHub Desktop.
Rename a value in psql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import psycopg2 | |
| import psycopg2.extras | |
| import os | |
| DB_CONNECTION_PARAMS = { | |
| "host": os.environ.get("PGHOST", ""), | |
| "port": os.environ.get("PGPORT", 5432), | |
| "user": os.environ.get("PGUSER", ""), | |
| "password": os.environ.get("PGPASSWORD", ""), | |
| "dbname": os.environ.get("PGDATABASE", "postgres") | |
| } | |
| OLD_DOMAIN_PART = "development-gcp" | |
| NEW_DOMAIN_PART = "development" | |
| def get_databases(conn): | |
| with conn.cursor() as cur: | |
| cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false AND datname <> 'postgres';") | |
| return [row[0] for row in cur.fetchall()] | |
| def get_tables(conn): | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| SELECT table_schema, table_name | |
| FROM information_schema.tables | |
| WHERE table_schema NOT IN ('pg_catalog', 'information_schema'); | |
| """) | |
| return cur.fetchall() | |
| def get_string_columns(conn, table_schema, table_name): | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| SELECT column_name | |
| FROM information_schema.columns | |
| WHERE table_schema = %s AND table_name = %s | |
| AND data_type IN ('character varying', 'varchar', 'text', 'char', 'character'); | |
| """, (table_schema, table_name)) | |
| return [row[0] for row in cur.fetchall()] | |
| def update_column(conn, table_schema, table_name, column_name): | |
| from psycopg2 import sql | |
| query = sql.SQL(""" | |
| UPDATE {schema}.{table} | |
| SET {column} = REPLACE({column}, %s, %s) | |
| WHERE {column} LIKE %s; | |
| """).format( | |
| schema=sql.Identifier(table_schema), | |
| table=sql.Identifier(table_name), | |
| column=sql.Identifier(column_name) | |
| ) | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(query, (OLD_DOMAIN_PART, NEW_DOMAIN_PART, f'%{OLD_DOMAIN_PART}%')) | |
| if cur.rowcount > 0: | |
| print(f" Updated {cur.rowcount} rows in {table_schema}.{table_name}.{column_name}") | |
| except Exception as e: | |
| print(f" Error updating {table_schema}.{table_name}.{column_name}: {e}") | |
| conn.rollback() | |
| def main(): | |
| print("--- Starting database update script ---") | |
| print(f"--- Replacing '{OLD_DOMAIN_PART}' with '{NEW_DOMAIN_PART}' ---") | |
| print("\nIMPORTANT: This script performs irreversible data modifications.") | |
| print("Please ensure you have a recent backup of your databases before proceeding.") | |
| try: | |
| with psycopg2.connect(**DB_CONNECTION_PARAMS) as conn: | |
| conn.autocommit = True | |
| databases = get_databases(conn) | |
| except psycopg2.Error as e: | |
| print(f"Error connecting to initial database: {e}") | |
| print("Please check your connection parameters in the DB_CONNECTION_PARAMS dictionary or your environment variables.") | |
| return | |
| if not databases: | |
| print("No databases found to process.") | |
| return | |
| print(f"\nFound databases: {', '.join(databases)}") | |
| for dbname in databases: | |
| print(f"\nProcessing database: {dbname}") | |
| db_params = DB_CONNECTION_PARAMS.copy() | |
| db_params["dbname"] = dbname | |
| try: | |
| with psycopg2.connect(**db_params) as db_conn: | |
| tables = get_tables(db_conn) | |
| for table_schema, table_name in tables: | |
| print(f" Processing table: {table_schema}.{table_name}") | |
| columns = get_string_columns(db_conn, table_schema, table_name) | |
| if not columns: | |
| print(" No text-like columns found.") | |
| continue | |
| for column_name in columns: | |
| update_column(db_conn, table_schema, table_name, column_name) | |
| db_conn.commit() | |
| except psycopg2.Error as e: | |
| print(f" Could not connect to or process database {dbname}: {e}") | |
| continue | |
| print("\n--- Script finished ---") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment