Skip to content

Instantly share code, notes, and snippets.

@apinter
Created October 21, 2025 11:44
Show Gist options
  • Save apinter/c1a76ed99841d947a662e3b476acff88 to your computer and use it in GitHub Desktop.
Save apinter/c1a76ed99841d947a662e3b476acff88 to your computer and use it in GitHub Desktop.
Rename a value in psql
import psycopg2
import psycopg2.extras
import os
DB_CONNECTION_PARAMS = {
"host": os.environ.get("PGHOST", ""),
"port": os.environ.get("PGPORT", 5432),
"user": os.environ.get("PGUSER", ""),
"password": os.environ.get("PGPASSWORD", ""),
"dbname": os.environ.get("PGDATABASE", "postgres")
}
OLD_DOMAIN_PART = "development-gcp"
NEW_DOMAIN_PART = "development"
def get_databases(conn):
with conn.cursor() as cur:
cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false AND datname <> 'postgres';")
return [row[0] for row in cur.fetchall()]
def get_tables(conn):
with conn.cursor() as cur:
cur.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog', 'information_schema');
""")
return cur.fetchall()
def get_string_columns(conn, table_schema, table_name):
with conn.cursor() as cur:
cur.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
AND data_type IN ('character varying', 'varchar', 'text', 'char', 'character');
""", (table_schema, table_name))
return [row[0] for row in cur.fetchall()]
def update_column(conn, table_schema, table_name, column_name):
from psycopg2 import sql
query = sql.SQL("""
UPDATE {schema}.{table}
SET {column} = REPLACE({column}, %s, %s)
WHERE {column} LIKE %s;
""").format(
schema=sql.Identifier(table_schema),
table=sql.Identifier(table_name),
column=sql.Identifier(column_name)
)
try:
with conn.cursor() as cur:
cur.execute(query, (OLD_DOMAIN_PART, NEW_DOMAIN_PART, f'%{OLD_DOMAIN_PART}%'))
if cur.rowcount > 0:
print(f" Updated {cur.rowcount} rows in {table_schema}.{table_name}.{column_name}")
except Exception as e:
print(f" Error updating {table_schema}.{table_name}.{column_name}: {e}")
conn.rollback()
def main():
print("--- Starting database update script ---")
print(f"--- Replacing '{OLD_DOMAIN_PART}' with '{NEW_DOMAIN_PART}' ---")
print("\nIMPORTANT: This script performs irreversible data modifications.")
print("Please ensure you have a recent backup of your databases before proceeding.")
try:
with psycopg2.connect(**DB_CONNECTION_PARAMS) as conn:
conn.autocommit = True
databases = get_databases(conn)
except psycopg2.Error as e:
print(f"Error connecting to initial database: {e}")
print("Please check your connection parameters in the DB_CONNECTION_PARAMS dictionary or your environment variables.")
return
if not databases:
print("No databases found to process.")
return
print(f"\nFound databases: {', '.join(databases)}")
for dbname in databases:
print(f"\nProcessing database: {dbname}")
db_params = DB_CONNECTION_PARAMS.copy()
db_params["dbname"] = dbname
try:
with psycopg2.connect(**db_params) as db_conn:
tables = get_tables(db_conn)
for table_schema, table_name in tables:
print(f" Processing table: {table_schema}.{table_name}")
columns = get_string_columns(db_conn, table_schema, table_name)
if not columns:
print(" No text-like columns found.")
continue
for column_name in columns:
update_column(db_conn, table_schema, table_name, column_name)
db_conn.commit()
except psycopg2.Error as e:
print(f" Could not connect to or process database {dbname}: {e}")
continue
print("\n--- Script finished ---")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment