Skip to content

Instantly share code, notes, and snippets.

@rvaidya
Last active December 14, 2024 21:24
Show Gist options
  • Save rvaidya/dc7044796b678977ef51aa0e0d40f38d to your computer and use it in GitHub Desktop.
Save rvaidya/dc7044796b678977ef51aa0e0d40f38d to your computer and use it in GitHub Desktop.
PostgreSQL script to upgrade serial and int columns with default nextval columns to GENERATED IDENTITY and reset sequences to MAX(id)
import argparse
import configparser
from os import path, getenv
import psycopg
from urllib.parse import quote
parser = argparse.ArgumentParser(
prog="postgresql-sequence-fixer",
description="Automatically upgrade table id/serial PostgreSQL sequences to GENERATED IDENTITY and reset ID to max",
)
parser.add_argument("-c", "--connection-string", help="The connection string to use")
parser.add_argument(
"-s",
"--server",
default="localhost",
help="The PostgreSQL server address, default 'localhost'",
)
parser.add_argument(
"-p",
"--port",
default="5432",
help="The port of the PostgreSQL server, default 5432",
)
parser.add_argument(
"-d",
"--database",
default="postgres",
help="The database of the PostgreSQL server to connect to, default 'postgres'",
)
parser.add_argument(
"-U",
"--username",
default="postgres",
help="The username to connect as, default 'postgres'",
)
parser.add_argument(
"-P", "--password", help="The password for the username to connect as"
)
parser.add_argument(
"-f",
"--file",
default="config.ini",
help="The config ini file containing connection information, default 'config.ini'",
)
parser.add_argument(
"-v",
"--verbose",
help="Print the statements being executed",
action="store_true",
)
args = parser.parse_args()
connection_string = args.connection_string
server = args.server
port = args.port
database = args.database
username = args.username
password = args.password
configfile = args.file
verbose = args.verbose
if path.exists(configfile):
config = configparser.ConfigParser()
config.read(configfile)
if "connection_string" in config["postgresql"]:
connection_string = config["postgresql"]["connection_string"]
if "server" in config["postgresql"]:
server = config["postgresql"]["server"]
if "port" in config["postgresql"]:
port = config["postgresql"]["port"]
if "database" in config["postgresql"]:
database = config["postgresql"]["database"]
if "username" in config["postgresql"]:
username = config["postgresql"]["username"]
if "password" in config["postgresql"]:
password = config["postgresql"]["password"]
connection_string = getenv("POSTGRESQL_CONNECTION_STRING") or connection_string
server = getenv("POSTGRESQL_SERVER") or server
port = getenv("POSTGRESQL_PORT") or port
database = getenv("POSTGRESQL_DATABASE") or database
username = getenv("POSTGRESQL_USERNAME") or username
password = getenv("POSTGRESQL_PASSWORD") or password
connection_string = (
connection_string
or f"postgresql://{quote(username)}:{quote(password)}@{server}:{port}/{database}"
)
def log(str):
if verbose:
print(str)
# Connect to an existing database
with psycopg.connect(connection_string) as conn:
with conn.execute(
"""
SELECT DISTINCT table_schema
, table_name
, column_name
, data_type
, LTRIM(LTRIM(RTRIM(RTRIM(column_default, '::regclass)'),''''),'nextval('), '''') AS SEQUENCE_NAME
FROM information_schema.columns
WHERE column_default like '%nextval%'
"""
) as sequences:
for schema, table, column, data_type, sequence in sequences:
# Assign sequences to tables if unparented
query = (
f'ALTER SEQUENCE {sequence} OWNED BY "{schema}"."{table}"."{column}";'
)
log(query)
conn.execute(query)
with conn.execute(
"""
SELECT DISTINCT table_schema
, table_name
, column_name
, data_type
, LTRIM(LTRIM(RTRIM(RTRIM(column_default, '::regclass)'),''''),'nextval('), '''') AS sequence_name
FROM information_schema.columns
WHERE column_default like '%nextval%'
"""
) as sequences:
for schema, table, column, data_type, sequence in sequences:
# Convert columns with defaults coming from sequences to identity columns
# This includes serial
query = f'ALTER TABLE "{schema}"."{table}" ALTER COLUMN "{column}" DROP DEFAULT;'
log(query)
conn.execute(query)
query = f"DROP SEQUENCE IF EXISTS {sequence};"
log(query)
conn.execute(query)
query = f'ALTER TABLE "{schema}"."{table}" ALTER COLUMN "{column}" ADD GENERATED ALWAYS AS IDENTITY;'
log(query)
conn.execute(query)
with conn.execute(
"""
SELECT
sequence_namespace.nspname AS sequence_schema,
class_sequence.relname AS sequence_name,
table_namespace.nspname AS table_schema,
class_table.relname as table_name,
pg_attribute.attname as column_name
FROM pg_depend
INNER JOIN pg_class AS class_sequence
ON class_sequence.oid = pg_depend.objid
AND class_sequence.relkind = 'S'
INNER JOIN pg_class AS class_table
ON class_table.oid = pg_depend.refobjid
INNER JOIN pg_attribute
ON pg_attribute.attrelid = class_table.oid
AND pg_depend.refobjsubid = pg_attribute.attnum
INNER JOIN pg_namespace as table_namespace
ON table_namespace.oid = class_table.relnamespace
INNER JOIN pg_namespace AS sequence_namespace
ON sequence_namespace.oid = class_sequence.relnamespace
ORDER BY sequence_namespace.nspname, class_sequence.relname;
"""
) as sequences:
for sequence_schema, sequence, table_schema, table, column in sequences:
# Reset sequences to MAX(ID)
query = f'SELECT setval(\'{sequence_schema}.{sequence}\', max({column})) FROM "{table_schema}"."{table}";'
log(query)
conn.execute(query)
@rvaidya
Copy link
Author

rvaidya commented Dec 14, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment