Skip to content

Instantly share code, notes, and snippets.

@mvdbeek
Created February 24, 2026 10:23
Show Gist options
  • Select an option

  • Save mvdbeek/d3b52239b968ef78bb4ee10638e76dfe to your computer and use it in GitHub Desktop.

Select an option

Save mvdbeek/d3b52239b968ef78bb4ee10638e76dfe to your computer and use it in GitHub Desktop.
26.0 db migratons
diff --git a/lib/galaxy/model/migrations/alembic/env.py b/lib/galaxy/model/migrations/alembic/env.py
index 98091912b80..c9db41cc84d 100644
--- a/lib/galaxy/model/migrations/alembic/env.py
+++ b/lib/galaxy/model/migrations/alembic/env.py
@@ -1,7 +1,7 @@
import logging
import re
+from collections.abc import Callable
from typing import (
- Callable,
cast,
)
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/04288b6a5b25_make_dataset_uuids_unique.py b/lib/galaxy/model/migrations/alembic/versions_gxy/04288b6a5b25_make_dataset_uuids_unique.py
index a0572676503..fd9a7fbb986 100644
--- a/lib/galaxy/model/migrations/alembic/versions_gxy/04288b6a5b25_make_dataset_uuids_unique.py
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/04288b6a5b25_make_dataset_uuids_unique.py
@@ -103,110 +103,92 @@ def downgrade():
def _restore_old_mappings(connection):
- restore_hda_dataset_ids = text(
- """
+ restore_hda_dataset_ids = text("""
UPDATE history_dataset_association
SET dataset_id=mapping.old_dataset_id
FROM hda_dataset_mapping_pre_uuid_condense AS mapping
WHERE mapping.id = history_dataset_association.id
- """
- )
+ """)
connection.execute(restore_hda_dataset_ids)
- restore_ldda_dataset_ids = text(
- """
+ restore_ldda_dataset_ids = text("""
UPDATE library_dataset_dataset_association
SET dataset_id=mapping.old_dataset_id
FROM ldda_dataset_mapping_pre_uuid_condense as mapping
WHERE mapping.id = library_dataset_dataset_association.id
- """
- )
+ """)
connection.execute(restore_ldda_dataset_ids)
def _restore_dataset_uuids(connection):
- restore_ldda_dataset_ids = text(
- f"""
+ restore_ldda_dataset_ids = text(f"""
UPDATE {dataset_table_name}
SET {uuid_column}=backup_datasets.{uuid_column}
FROM duplicate_datasets_by_uuid as backup_datasets
WHERE backup_datasets.id = {dataset_table_name}.id
- """
- )
+ """)
connection.execute(restore_ldda_dataset_ids)
def _setup_duplicate_counts(connection):
- duplicate_counts_query = text(
- f"""
+ duplicate_counts_query = text(f"""
CREATE TEMP TABLE temp_duplicates_counts AS
SELECT {uuid_column}, COUNT(*)
FROM {dataset_table_name}
GROUP BY {uuid_column}
HAVING COUNT(*) > 1
- """
- )
+ """)
connection.execute(duplicate_counts_query)
def _setup_backup_datasets_for_duplicated_uuids(connection):
- duplicate_datasets = text(
- f"""
+ duplicate_datasets = text(f"""
CREATE TABLE duplicate_datasets_by_uuid AS
SELECT *
FROM {dataset_table_name}
WHERE {uuid_column} IN (select {uuid_column} from temp_duplicates_counts)
- """
- )
+ """)
connection.execute(duplicate_datasets)
def _setup_duplicated_dataset_views_by_purged_status(connection):
- duplicate_purged_datasets_query = text(
- """
+ duplicate_purged_datasets_query = text("""
CREATE TEMP VIEW temp_duplicate_datasets_purged AS
SELECT *
FROM duplicate_datasets_by_uuid
WHERE purged = true
- """
- )
+ """)
connection.execute(duplicate_purged_datasets_query)
- duplicate_active_datasets_query = text(
- """
+ duplicate_active_datasets_query = text("""
CREATE TEMP VIEW temp_duplicate_datasets_active AS
SELECT *
FROM duplicate_datasets_by_uuid
WHERE purged = false
- """
- )
+ """)
connection.execute(duplicate_active_datasets_query)
_debug(connection, "purged duplicated", text("select count(*) from temp_duplicate_datasets_purged"))
_debug(connection, "active duplicated", text("select count(*) from temp_duplicate_datasets_active"))
def _find_latest_active_dataset_for_each_uuid(connection):
- latest_active_duplicate_query = text(
- f"""
+ latest_active_duplicate_query = text(f"""
CREATE TEMP TABLE temp_latest_active_duplicate AS
SELECT {uuid_column}, MIN(id) as latest_dataset_id
FROM temp_duplicate_datasets_active
GROUP BY {uuid_column}
- """
- )
+ """)
connection.execute(latest_active_duplicate_query)
debug_query = text("select * from temp_latest_active_duplicate")
_debug(connection, "latest active table", debug_query)
def _map_active_uuids_to_latest(connection):
- active_mapping_query = text(
- f"""
+ active_mapping_query = text(f"""
CREATE TEMP TABLE temp_active_mapping AS
SELECT d.id as from_dataset_id, l.latest_dataset_id as to_dataset_id, l.{uuid_column} as uuid
FROM temp_duplicate_datasets_active as d
LEFT JOIN temp_latest_active_duplicate l ON d.{uuid_column} = l.{uuid_column}
- """
- )
+ """)
connection.execute(active_mapping_query)
debug_query = text("select * from temp_active_mapping")
_debug(connection, "temp active mapping", debug_query)
@@ -215,35 +197,30 @@ def _map_active_uuids_to_latest(connection):
def _randomize_uuids_for_purged_datasets_with_duplicated_uuids(connection):
- updated_purged_uuids = text(
- f"""
+ updated_purged_uuids = text(f"""
UPDATE {dataset_table_name}
SET uuid={new_uuid()}
WHERE {uuid_column} IN (SELECT {uuid_column} FROM temp_duplicate_datasets_purged) AND purged = true
- """
- )
+ """)
connection.execute(updated_purged_uuids)
def _randomize_uuids_for_older_active_datasets_with_duplicated_uuids(connection):
# sanity check...
- duplicate_datasets_with_uuid_of_latest_active_uuid = text(
- f"""
+ duplicate_datasets_with_uuid_of_latest_active_uuid = text(f"""
SELECT COUNT(*)
FROM {dataset_table_name} as d
INNER JOIN temp_active_mapping AS a ON d.uuid = a.uuid
GROUP BY d.{uuid_column}
HAVING COUNT(*) > 1
- """
- )
+ """)
_debug(
connection,
"(before) duplicate_datasets_with_uuid_of_latest_active_uuid",
duplicate_datasets_with_uuid_of_latest_active_uuid,
)
- update_older_datasets = text(
- f"""
+ update_older_datasets = text(f"""
UPDATE {dataset_table_name}
SET uuid={new_uuid()}
WHERE EXISTS
@@ -252,8 +229,7 @@ def _randomize_uuids_for_older_active_datasets_with_duplicated_uuids(connection)
FROM temp_active_mapping as m
where m.from_dataset_id = {dataset_table_name}.id and m.from_dataset_id != m.to_dataset_id
)
- """
- )
+ """)
connection.execute(update_older_datasets)
_debug(
@@ -262,65 +238,53 @@ def _randomize_uuids_for_older_active_datasets_with_duplicated_uuids(connection)
duplicate_datasets_with_uuid_of_latest_active_uuid,
)
- duplicate_active_count = text(
- """
+ duplicate_active_count = text("""
SELECT COUNT(*)
FROM temp_active_mapping
- """
- )
+ """)
_debug(connection, "(after) duplicate_active_count", duplicate_active_count)
- datasets_with_originally_duplicated_uuids = text(
- f"""
+ datasets_with_originally_duplicated_uuids = text(f"""
SELECT COUNT(*)
FROM {dataset_table_name} as d
INNER JOIN temp_active_mapping AS a ON d.uuid = a.uuid
- """
- )
+ """)
_debug(connection, "(after) datasets_with_originally_duplicated_uuids", datasets_with_originally_duplicated_uuids)
def _update_dataset_associations_to_point_to_latest_active_datasets(connection):
# for others select one dataset to represent the dataset in HDAs/LDDAs
- update_hda_links = text(
- """
+ update_hda_links = text("""
UPDATE history_dataset_association
SET dataset_id=t.to_dataset_id
FROM temp_active_mapping t
WHERE t.from_dataset_id = dataset_id
- """
- )
+ """)
connection.execute(update_hda_links)
- update_ldda_links = text(
- """
+ update_ldda_links = text("""
UPDATE library_dataset_dataset_association
SET dataset_id=t.to_dataset_id
FROM temp_active_mapping t
WHERE t.from_dataset_id = dataset_id
- """
- )
+ """)
connection.execute(update_ldda_links)
def _preserve_old_dataset_association_mappings(connection):
- old_hda_mappings = text(
- f"""
+ old_hda_mappings = text(f"""
CREATE TABLE hda_dataset_mapping_pre_uuid_condense AS
SELECT DISTINCT h.id as id, d.id as old_dataset_id
FROM history_dataset_association AS h
INNER JOIN dataset AS d ON h.dataset_id = d.id
INNER JOIN duplicate_datasets_by_uuid AS duplicates ON d.{uuid_column} = duplicates.{uuid_column}
- """
- )
+ """)
connection.execute(old_hda_mappings)
- old_ldda_mappings = text(
- f"""
+ old_ldda_mappings = text(f"""
CREATE TABLE ldda_dataset_mapping_pre_uuid_condense AS
SELECT l.id as id, d.id as old_dataset_id
FROM library_dataset_dataset_association aS l
INNER JOIN dataset AS d ON l.dataset_id = d.id
INNER JOIN duplicate_datasets_by_uuid AS duplicates ON d.{uuid_column} = duplicates.{uuid_column}
- """
- )
+ """)
connection.execute(old_ldda_mappings)
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/1d1d7bf6ac02_tool_request_implicit_outputs.py b/lib/galaxy/model/migrations/alembic/versions_gxy/1d1d7bf6ac02_tool_request_implicit_outputs.py
new file mode 100644
index 00000000000..318fe512489
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/1d1d7bf6ac02_tool_request_implicit_outputs.py
@@ -0,0 +1,60 @@
+"""Track tool request implicit output collections.
+
+Revision ID: 1d1d7bf6ac02
+Revises: a5c5455b849a
+Create Date: 2024-11-18 15:39:42.900327
+
+"""
+
+from sqlalchemy import (
+ Column,
+ Integer,
+ String,
+)
+
+from galaxy.model.migrations.util import (
+ create_foreign_key,
+ create_table,
+ drop_table,
+ transaction,
+)
+
+# revision identifiers, used by Alembic.
+revision = "1d1d7bf6ac02"
+down_revision = "a5c5455b849a"
+branch_labels = None
+depends_on = None
+
+association_table_name = "tool_request_implicit_collection_association"
+
+
+def upgrade():
+ with transaction():
+ create_table(
+ association_table_name,
+ Column("id", Integer, primary_key=True),
+ Column("tool_request_id", Integer, index=True),
+ Column("dataset_collection_id", Integer, index=True),
+ Column("output_name", String(255), nullable=False),
+ )
+
+ create_foreign_key(
+ "fk_trica_tri",
+ association_table_name,
+ "tool_request",
+ ["tool_request_id"],
+ ["id"],
+ )
+
+ create_foreign_key(
+ "fk_trica_dci",
+ association_table_name,
+ "history_dataset_collection_association",
+ ["dataset_collection_id"],
+ ["id"],
+ )
+
+
+def downgrade():
+ with transaction():
+ drop_table(association_table_name)
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/312602e3191d_add_workflow_step_id_path.py b/lib/galaxy/model/migrations/alembic/versions_gxy/312602e3191d_add_workflow_step_id_path.py
new file mode 100644
index 00000000000..e3413ebf989
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/312602e3191d_add_workflow_step_id_path.py
@@ -0,0 +1,43 @@
+"""Add workflow_step_index_path column to workflow_invocation_message table
+
+Revision ID: 312602e3191d
+Revises: 75b461a2b24a
+Create Date: 2025-12-27 12:00:00.000000
+"""
+
+import sqlalchemy as sa
+
+from galaxy.model.migrations.util import (
+ add_column,
+ drop_column,
+)
+
+# revision identifiers, used by Alembic.
+revision = "312602e3191d"
+down_revision = "75b461a2b24a"
+branch_labels = None
+depends_on = None
+
+# database object names used in this revision
+table_name = "workflow_invocation_message"
+column_name = "workflow_step_index_path"
+
+
+def upgrade():
+ column = sa.Column(
+ column_name,
+ sa.JSON,
+ nullable=True, # Allow nulls for existing rows and backward compatibility
+ default=None, # Default to None if not provided
+ )
+ add_column(
+ table_name,
+ column,
+ )
+
+
+def downgrade():
+ drop_column(
+ table_name,
+ column_name,
+ )
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/724237cc4cf0_migrate_custos_to_psa_tokens.py b/lib/galaxy/model/migrations/alembic/versions_gxy/724237cc4cf0_migrate_custos_to_psa_tokens.py
new file mode 100644
index 00000000000..6f77a78f68b
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/724237cc4cf0_migrate_custos_to_psa_tokens.py
@@ -0,0 +1,75 @@
+"""migrate custos to psa tokens
+
+Revision ID: 724237cc4cf0
+Revises: 1d1d7bf6ac02
+Create Date: 2025-11-03 15:22:13.111461
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+from galaxy.model.migrations.data_fixes.custos_to_psa import (
+ CUSTOS_TABLE,
+ migrate_custos_tokens_to_psa,
+ remove_migrated_psa_tokens,
+ restore_custos_tokens_from_psa,
+)
+from galaxy.model.migrations.util import (
+ create_table,
+ create_unique_constraint,
+ drop_table,
+ transaction,
+)
+
+# revision identifiers, used by Alembic.
+revision = "724237cc4cf0"
+down_revision = "1d1d7bf6ac02"
+branch_labels = None
+depends_on = None
+
+_USER_EXTERNAL_PROVIDER_CONSTRAINT = "custos_authnz_token_user_id_external_user_id_provider_key"
+_EXTERNAL_PROVIDER_CONSTRAINT = "custos_authnz_token_external_user_id_provider_key"
+
+
+def upgrade():
+ """
+ Migrate authentication tokens from custos_authnz_token to oidc_user_authnz_tokens
+ and drop the obsolete custos_authnz_token table.
+ """
+ with transaction():
+ connection = op.get_bind()
+ migrate_custos_tokens_to_psa(connection)
+ drop_table(CUSTOS_TABLE)
+
+
+def downgrade():
+ """
+ Recreate custos_authnz_token and restore any Custos tokens that were migrated.
+ """
+ with transaction():
+ connection = op.get_bind()
+ create_table(
+ CUSTOS_TABLE,
+ sa.Column("id", sa.Integer, primary_key=True),
+ sa.Column("user_id", sa.Integer, sa.ForeignKey("galaxy_user.id"), index=True),
+ sa.Column("external_user_id", sa.String(255)),
+ sa.Column("provider", sa.String(255)),
+ sa.Column("access_token", sa.Text),
+ sa.Column("id_token", sa.Text),
+ sa.Column("refresh_token", sa.Text),
+ sa.Column("expiration_time", sa.DateTime),
+ sa.Column("refresh_expiration_time", sa.DateTime),
+ )
+ create_unique_constraint(
+ _USER_EXTERNAL_PROVIDER_CONSTRAINT,
+ CUSTOS_TABLE,
+ ["user_id", "external_user_id", "provider"],
+ )
+ create_unique_constraint(
+ _EXTERNAL_PROVIDER_CONSTRAINT,
+ CUSTOS_TABLE,
+ ["external_user_id", "provider"],
+ )
+ restore_custos_tokens_from_psa(connection)
+ remove_migrated_psa_tokens(connection)
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/75b461a2b24a_merge_heads.py b/lib/galaxy/model/migrations/alembic/versions_gxy/75b461a2b24a_merge_heads.py
new file mode 100644
index 00000000000..799fe463e1e
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/75b461a2b24a_merge_heads.py
@@ -0,0 +1,21 @@
+"""Merge heads
+
+Revision ID: 75b461a2b24a
+Revises: 724237cc4cf0, 98621a25ab75
+Create Date: 2025-12-12 18:54:48.572124
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "75b461a2b24a"
+down_revision = ("724237cc4cf0", "98621a25ab75")
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ pass
+
+
+def downgrade():
+ pass
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/98621a25ab75_merge_migration_heads.py b/lib/galaxy/model/migrations/alembic/versions_gxy/98621a25ab75_merge_migration_heads.py
new file mode 100644
index 00000000000..bca789da3d3
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/98621a25ab75_merge_migration_heads.py
@@ -0,0 +1,21 @@
+"""Merge migration heads
+
+Revision ID: 98621a25ab75
+Revises: 1d1d7bf6ac02, 63dc6cca023f
+Create Date: 2025-12-12 13:48:07.888987
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "98621a25ab75"
+down_revision = ("1d1d7bf6ac02", "63dc6cca023f")
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ pass
+
+
+def downgrade():
+ pass
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/9930b68c85af_add_workflow_invocation_completion.py b/lib/galaxy/model/migrations/alembic/versions_gxy/9930b68c85af_add_workflow_invocation_completion.py
new file mode 100644
index 00000000000..0edc35ded81
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/9930b68c85af_add_workflow_invocation_completion.py
@@ -0,0 +1,59 @@
+"""add workflow_invocation_completion table and on_complete column
+
+Revision ID: 9930b68c85af
+Revises: 312602e3191d
+Create Date: 2026-01-03 12:00:00.000000
+
+"""
+
+from sqlalchemy import (
+ Column,
+ DateTime,
+ ForeignKey,
+ Integer,
+ JSON,
+)
+
+from galaxy.model.migrations.util import (
+ add_column,
+ create_table,
+ drop_column,
+ drop_table,
+)
+
+# revision identifiers, used by Alembic.
+revision = "9930b68c85af"
+down_revision = "312602e3191d"
+branch_labels = None
+depends_on = None
+
+
+table_name = "workflow_invocation_completion"
+
+
+def upgrade():
+ create_table(
+ table_name,
+ Column("id", Integer, primary_key=True),
+ Column(
+ "workflow_invocation_id",
+ Integer,
+ ForeignKey("workflow_invocation.id"),
+ index=True,
+ unique=True,
+ nullable=False,
+ ),
+ Column("completion_time", DateTime),
+ Column("job_state_summary", JSON),
+ Column("hooks_executed", JSON),
+ )
+ # Add column to workflow_invocation for per-invocation completion actions
+ add_column(
+ "workflow_invocation",
+ Column("on_complete", JSON),
+ )
+
+
+def downgrade():
+ drop_column("workflow_invocation", "on_complete")
+ drop_table(table_name)
diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/a5c5455b849a_drop_job_params_column.py b/lib/galaxy/model/migrations/alembic/versions_gxy/a5c5455b849a_drop_job_params_column.py
new file mode 100644
index 00000000000..f3848ff219f
--- /dev/null
+++ b/lib/galaxy/model/migrations/alembic/versions_gxy/a5c5455b849a_drop_job_params_column.py
@@ -0,0 +1,32 @@
+"""Drop Job.params column
+
+Revision ID: a5c5455b849a
+Revises: cd26484899fb
+Create Date: 2025-10-15 16:13:14.778789
+
+"""
+
+from sqlalchemy import Column
+
+from galaxy.model.custom_types import TrimmedString
+from galaxy.model.migrations.util import (
+ add_column,
+ drop_column,
+)
+
+# revision identifiers, used by Alembic.
+revision = "a5c5455b849a"
+down_revision = "cd26484899fb"
+branch_labels = None
+depends_on = None
+
+table_name = "job"
+column_name = "params"
+
+
+def upgrade():
+ drop_column(table_name, column_name)
+
+
+def downgrade():
+ add_column(table_name, Column(column_name, TrimmedString(255)))
diff --git a/lib/galaxy/model/migrations/data_fixes/custos_to_psa.py b/lib/galaxy/model/migrations/data_fixes/custos_to_psa.py
new file mode 100644
index 00000000000..321975674a3
--- /dev/null
+++ b/lib/galaxy/model/migrations/data_fixes/custos_to_psa.py
@@ -0,0 +1,238 @@
+"""Reusable helpers for migrating Custos authentication tokens into PSA format."""
+
+from datetime import datetime
+from typing import (
+ cast,
+ Optional,
+)
+
+import jwt
+from sqlalchemy import (
+ Column,
+ DateTime,
+ Integer,
+ MetaData,
+ select,
+ String,
+ Table,
+ Text,
+)
+from sqlalchemy.engine import Connection
+
+from galaxy.model import UserAuthnzToken
+
+CUSTOS_TABLE = "custos_authnz_token"
+PSA_TABLE = "oidc_user_authnz_tokens"
+CUSTOS_ASSOC_TYPE = "custos_migrated"
+
+
+def _extract_iat_from_token(token: Optional[str]) -> Optional[int]:
+ """
+ Extract the 'iat' (issued at) claim from a JWT token.
+ Returns None if the token cannot be decoded or doesn't have an iat claim.
+ """
+ if not token:
+ return None
+ try:
+ decoded = jwt.decode(token, options={"verify_signature": False})
+ return decoded.get("iat")
+ except Exception:
+ return None
+
+
+def get_custos_table(connection: Connection) -> Table:
+ """
+ Reflect the custos_authnz_token table for use in migrations or tests.
+ """
+ return Table(
+ CUSTOS_TABLE,
+ MetaData(),
+ Column("id", Integer, primary_key=True),
+ Column("user_id", Integer),
+ Column("external_user_id", String(255)),
+ Column("provider", String(255)),
+ Column("access_token", Text),
+ Column("id_token", Text),
+ Column("refresh_token", Text),
+ Column("expiration_time", DateTime),
+ Column("refresh_expiration_time", DateTime),
+ extend_existing=True,
+ autoload_with=connection,
+ )
+
+
+def get_psa_table(connection: Connection) -> Table:
+ """
+ Get the oidc_user_authnz_tokens table directly from the UserAuthnzToken model.
+ """
+ return cast(Table, UserAuthnzToken.table)
+
+
+def migrate_custos_tokens_to_psa(
+ connection: Connection,
+ custos_table: Optional[Table] = None,
+ psa_table: Optional[Table] = None,
+) -> int:
+ """
+ Transform Custos tokens into PSA tokens and insert them into oidc_user_authnz_tokens.
+
+ :returns: Number of tokens that were migrated.
+ """
+ if custos_table is None:
+ custos_table = get_custos_table(connection)
+ if psa_table is None:
+ psa_table = get_psa_table(connection)
+
+ custos_records = connection.execute(select(custos_table)).fetchall()
+
+ migrated_count = 0
+ for record in custos_records:
+ now_ts = int(datetime.now().timestamp())
+ existing = connection.execute(
+ select(psa_table).where(
+ psa_table.c.user_id == record.user_id,
+ psa_table.c.provider == record.provider,
+ psa_table.c.uid == record.external_user_id,
+ )
+ ).first()
+
+ if existing:
+ continue
+
+ extra_data = {}
+ if record.access_token:
+ extra_data["access_token"] = record.access_token
+ if record.id_token:
+ extra_data["id_token"] = record.id_token
+ if record.refresh_token:
+ extra_data["refresh_token"] = record.refresh_token
+
+ # Extract auth_time from token's 'iat' claim (issued at time)
+ # Priority: access_token.iat > id_token.iat > current time
+ # We prefer access_token.iat since that's the token whose expiration we're tracking
+ auth_time = _extract_iat_from_token(record.access_token) or _extract_iat_from_token(record.id_token) or now_ts
+ extra_data["auth_time"] = auth_time
+
+ # Calculate expires from expiration_time
+ # If token is expired or expiration_time is missing, set expires to 1 second
+ # to trigger immediate refresh on next use
+ if record.expiration_time:
+ expires_at = int(record.expiration_time.timestamp())
+ # Check if token is already expired (expiration_time < now)
+ if expires_at <= now_ts:
+ # Token already expired - set to 1 second to trigger refresh
+ extra_data["expires"] = 1
+ else:
+ # Token still valid - calculate original lifetime
+ expires = expires_at - auth_time
+ extra_data["expires"] = expires if expires > 0 else 1
+ else:
+ # No expiration_time - set to 1 second to trigger refresh
+ extra_data["expires"] = 1
+
+ if record.refresh_expiration_time and "refresh_token" in extra_data:
+ refresh_expires_at = int(record.refresh_expiration_time.timestamp())
+ extra_data["refresh_expires_in"] = refresh_expires_at - now_ts
+
+ connection.execute(
+ psa_table.insert().values(
+ user_id=record.user_id,
+ uid=record.external_user_id,
+ provider=record.provider,
+ extra_data=extra_data,
+ lifetime=None,
+ assoc_type=CUSTOS_ASSOC_TYPE,
+ )
+ )
+ migrated_count += 1
+
+ return migrated_count
+
+
+def remove_migrated_psa_tokens(
+ connection: Connection,
+ custos_table: Optional[Table] = None,
+ psa_table: Optional[Table] = None,
+) -> int:
+ """
+ Remove PSA tokens that were created during the Custos migration.
+
+ :returns: Number of PSA rows removed.
+ """
+ if custos_table is None:
+ custos_table = get_custos_table(connection)
+ if psa_table is None:
+ psa_table = get_psa_table(connection)
+
+ custos_records = connection.execute(select(custos_table)).fetchall()
+
+ removed_count = 0
+ for record in custos_records:
+ result = connection.execute(
+ psa_table.delete().where(
+ psa_table.c.user_id == record.user_id,
+ psa_table.c.provider == record.provider,
+ psa_table.c.uid == record.external_user_id,
+ psa_table.c.assoc_type == CUSTOS_ASSOC_TYPE,
+ )
+ )
+ removed_count += result.rowcount
+
+ return removed_count
+
+
+def restore_custos_tokens_from_psa(
+ connection: Connection,
+ custos_table: Optional[Table] = None,
+ psa_table: Optional[Table] = None,
+) -> int:
+ """
+ Restore Custos tokens from PSA rows that originated from the migration.
+
+ :returns: Number of tokens restored into custos_authnz_token.
+ """
+ if custos_table is None:
+ custos_table = get_custos_table(connection)
+ if psa_table is None:
+ psa_table = get_psa_table(connection)
+
+ psa_records = connection.execute(select(psa_table).where(psa_table.c.assoc_type == CUSTOS_ASSOC_TYPE)).fetchall()
+
+ restored_count = 0
+ for record in psa_records:
+ extra_data = record.extra_data or {}
+ expiration_time = None
+ refresh_expiration_time = None
+
+ auth_time = extra_data.get("auth_time")
+ expires = extra_data.get("expires")
+ refresh_expires_in = extra_data.get("refresh_expires_in")
+
+ if auth_time is not None:
+ auth_time = int(auth_time)
+ if expires is not None:
+ expires = int(expires)
+ if refresh_expires_in is not None:
+ refresh_expires_in = int(refresh_expires_in)
+
+ if auth_time is not None and expires is not None:
+ expiration_time = datetime.fromtimestamp(auth_time + expires)
+
+ if auth_time is not None and refresh_expires_in is not None:
+ refresh_expiration_time = datetime.fromtimestamp(auth_time + refresh_expires_in)
+
+ connection.execute(
+ custos_table.insert().values(
+ user_id=record.user_id,
+ external_user_id=record.uid,
+ provider=record.provider,
+ access_token=extra_data.get("access_token"),
+ id_token=extra_data.get("id_token"),
+ refresh_token=extra_data.get("refresh_token"),
+ expiration_time=expiration_time,
+ refresh_expiration_time=refresh_expiration_time,
+ )
+ )
+ restored_count += 1
+
+ return restored_count
diff --git a/lib/galaxy/model/migrations/data_fixes/user_table_fixer.py b/lib/galaxy/model/migrations/data_fixes/user_table_fixer.py
index 4b9054872cd..61feb7646f1 100644
--- a/lib/galaxy/model/migrations/data_fixes/user_table_fixer.py
+++ b/lib/galaxy/model/migrations/data_fixes/user_table_fixer.py
@@ -72,14 +72,12 @@ class EmailDeduplicator:
self._deduplicate_users(email, duplicates)
def _get_users_with_same_email(self, email: str):
- sql = text(
- """
+ sql = text("""
SELECT u.id, EXISTS(SELECT h.id FROM history h WHERE h.user_id = u.id)
FROM galaxy_user u
WHERE u.email = :email
ORDER BY u.create_time
- """
- )
+ """)
params = {"email": email}
return self.connection.execute(sql, params).all()
diff --git a/lib/galaxy/model/migrations/dbrevisions.py b/lib/galaxy/model/migrations/dbrevisions.py
new file mode 100644
index 00000000000..d83a9bff247
--- /dev/null
+++ b/lib/galaxy/model/migrations/dbrevisions.py
@@ -0,0 +1,19 @@
+# Update this file with tags for each new release.
+# Note: the key should NOT be a prefix of an existing revision hash in alembic/versions_gxy/.
+# For example, if we have a hash 231xxxxxxxxx and use 231 as the key for release 23.1,
+# then using 231 as a partial revision identifier like `sh manage_db.sh upgrade 231`
+# will map to release 23.1 instead of revision 231xxxxxxxxx.
+
+REVISION_TAGS = {
+ "22.01": "base",
+ "22.05": "186d4835587b",
+ "23.0": "caa7742f7bca",
+ "23.1": "e93c5d0b47a9",
+ "23.2": "8a19186a6ee7",
+ "24.0": "55f02fd8ab6c",
+ "24.1": "04288b6a5b25",
+ "24.2": "a4c3ef999ab5",
+ "25.0": "c716ee82337b",
+ "25.1": "63dc6cca023f",
+ "26.0": "9930b68c85af",
+}
diff --git a/lib/galaxy/model/migrations/dbscript.py b/lib/galaxy/model/migrations/dbscript.py
index 012e67ce147..381bc68b4b6 100644
--- a/lib/galaxy/model/migrations/dbscript.py
+++ b/lib/galaxy/model/migrations/dbscript.py
@@ -10,6 +10,7 @@ from galaxy.model.migrations.base import (
BaseDbScript,
BaseParserBuilder,
)
+from galaxy.model.migrations.dbrevisions import REVISION_TAGS
from galaxy.model.migrations.scripts import (
get_configuration,
get_configuration_from_file,
@@ -24,34 +25,6 @@ CONFIG_DIR_NAME = "config"
GXY_CONFIG_PREFIX = "GALAXY_CONFIG_"
TSI_CONFIG_PREFIX = "GALAXY_INSTALL_CONFIG_"
-# Update this dict with tags for each new release.
-# Note: the key should NOT be a prefix of an existing revision hash in alembic/versions_gxy/.
-# For example, if we have a hash 231xxxxxxxxx and use 231 as the key for release 23.1,
-# then using 231 as a partial revision identifier like `sh manage_db.sh upgrade 231`
-# will map to release 23.1 instead of revision 231xxxxxxxxx.
-REVISION_TAGS = {
- "release_22.01": "base",
- "22.01": "base",
- "release_22.05": "186d4835587b",
- "22.05": "186d4835587b",
- "release_23.0": "caa7742f7bca",
- "23.0": "caa7742f7bca",
- "release_23.1": "e93c5d0b47a9",
- "23.1": "e93c5d0b47a9",
- "release_23.2": "8a19186a6ee7",
- "23.2": "8a19186a6ee7",
- "release_24.0": "55f02fd8ab6c",
- "24.0": "55f02fd8ab6c",
- "release_24.1": "04288b6a5b25",
- "24.1": "04288b6a5b25",
- "release_24.2": "a4c3ef999ab5",
- "24.2": "a4c3ef999ab5",
- "release_25.0": "c716ee82337b",
- "25.0": "c716ee82337b",
- "release_25.1": "63dc6cca023f",
- "25.1": "63dc6cca023f",
-}
-
class ParserBuilder(BaseParserBuilder):
def _get_command_object(self):
@@ -81,7 +54,7 @@ class DbScript(BaseDbScript):
return f"gxy@{revision_id}"
def _revision_tags(self):
- return REVISION_TAGS
+ return {f"release_{k}": v for k, v in REVISION_TAGS.items()} | REVISION_TAGS
def _set_dburl(self, config_file: Optional[str] = None) -> None:
gxy_config, tsi_config, _ = get_configuration_from_file(os.getcwd(), config_file)
diff --git a/lib/galaxy/model/migrations/exceptions.py b/lib/galaxy/model/migrations/exceptions.py
index 039bfe79d94..c1fbd2d700e 100644
--- a/lib/galaxy/model/migrations/exceptions.py
+++ b/lib/galaxy/model/migrations/exceptions.py
@@ -53,20 +53,16 @@ class RevisionNotFoundError(Exception):
class DatabaseDoesNotExistError(Exception):
def __init__(self, db_url: str) -> None:
- super().__init__(
- f"""The database at {db_url} does not exist. You must
+ super().__init__(f"""The database at {db_url} does not exist. You must
create and initialize the database before running this script. You
can do so by (a) running `create_db.sh`; or by (b) starting Galaxy,
in which case Galaxy will create and initialize the database
- automatically."""
- )
+ automatically.""")
class DatabaseNotInitializedError(Exception):
def __init__(self, db_url: str) -> None:
- super().__init__(
- f"""The database at {db_url} is empty. You must
+ super().__init__(f"""The database at {db_url} is empty. You must
initialize the database before running this script. You can do so by
(a) running `create_db.sh`; or by (b) starting Galaxy, in which case
- Galaxy will initialize the database automatically."""
- )
+ Galaxy will initialize the database automatically.""")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment