Skip to content

Instantly share code, notes, and snippets.

@sfc-gh-mslot
Last active November 11, 2025 15:25
Show Gist options
  • Select an option

  • Save sfc-gh-mslot/aba205db1d1bd5918fda9488ec3b3f10 to your computer and use it in GitHub Desktop.

Select an option

Save sfc-gh-mslot/aba205db1d1bd5918fda9488ec3b3f10 to your computer and use it in GitHub Desktop.
Expose pg_lake Iceberg tables via Polaris metadata
create or replace function add_table_to_polaris(p_namespace_name text, p_table_name text, p_metadata_location text)
returns void
language plpgsql
as $function$
declare
v_catalog_id bigint;
v_namespace_id bigint;
v_table_id bigint;
v_now bigint := (extract(epoch FROM clock_timestamp()) * 1000)::bigint;
v_location text;
begin
v_location := regexp_replace(p_metadata_location, '(.*?/)(metadata/[^/]*.metadata.json)$', '\1');
/* get catalog ID (should use database name?) */
select id into v_catalog_id
from polaris_schema.entities
where name = current_database() and type_code = 4;
if not found then
/* catalog not in polaris */
return;
end if;
/* get namespace ID */
select id into v_namespace_id
from polaris_schema.entities
where name = p_namespace_name and type_code = 6 and parent_id = v_catalog_id for update;
if not found then
/* namespace not yet in polaris */
v_namespace_id := (random() * 9223372036854775807)::bigint;
/* insert namespace */
insert into polaris_schema.entities (
realm_id,
catalog_id,
id,
parent_id,
name,
entity_version,
type_code,
sub_type_code,
create_timestamp,
drop_timestamp,
purge_timestamp,
to_purge_timestamp,
last_update_timestamp,
properties,
internal_properties,
grant_records_version)
values (
'POLARIS',
v_catalog_id,
v_namespace_id,
v_catalog_id,
p_namespace_name,
1, /* version */
6, /* namespace type code */
0, /* sub type code? */
v_now,
0, /* drop timestamp */
0, /* purge timestamp */
0, /* to purge timestamp */
v_now, /* last update timestamp */
format('{"location":"%s"}', v_location)::jsonb,
'{}'::jsonb,
1);
end if;
/* get table ID */
select id into v_table_id
from polaris_schema.entities
where name = p_table_name and type_code = 7 and parent_id = v_namespace_id for update;
if not found then
/* table not yet in polaris */
v_table_id := (random() * 9223372036854775807)::bigint;
/* insert table */
insert into polaris_schema.entities (
realm_id,
catalog_id,
id,
parent_id,
name,
entity_version,
type_code,
sub_type_code,
create_timestamp,
drop_timestamp,
purge_timestamp,
to_purge_timestamp,
last_update_timestamp,
properties,
internal_properties,
grant_records_version)
values (
'POLARIS',
v_catalog_id,
v_table_id,
v_namespace_id,
p_table_name,
1, /* version */
7, /* tables are 7 */
2, /* sub type code? */
v_now,
0, /* drop timestamp */
0, /* purge timestamp */
0, /* to purge timestamp */
v_now, /* last update timestamp */
format('{"location":"%s"}', v_location)::jsonb,
format('{"parent-namespace":"%s","metadata-location":"%s"}', p_namespace_name, p_metadata_location)::jsonb,
1);
else
/* insert table */
update
polaris_schema.entities e
set
entity_version = e.entity_version + 1,
internal_properties = format('{"parent-namespace":"%s","metadata-location":"%s"}', p_namespace_name, p_metadata_location)::jsonb,
last_update_timestamp = v_now
where
id = v_table_id;
end if;
end;
$function$;
create or replace function remove_table_from_polaris(p_namespace_name text, p_table_name text)
returns void language plpgsql
as $function$
declare
v_catalog_id bigint;
v_namespace_id bigint;
begin
/* get catalog ID */
select id into v_catalog_id
from polaris_schema.entities
where name = current_database() || '_catalog' and type_code = 4;
if not found then
/* catalog not in polaris */
return;
end if;
/* get namespace ID */
select id into v_namespace_id
from polaris_schema.entities
where name = p_namespace_name and type_code = 6 and parent_id = v_catalog_id;
if not found then
/* namespace not in polaris */
return;
end if;
delete from polaris_schema.entities where name = p_table_name and parent_id = v_namespace_id;
end;
$function$;
create or replace function polaris_change()
returns trigger language plpgsql
as $function$
declare
v_metadata jsonb;
v_table_name text;
v_namespace_name text;
begin
if TG_OP = 'INSERT' or TG_OP = 'UPDATE' then
v_table_name := (select relname from pg_catalog.pg_class where oid = NEW.table_name);
v_namespace_name := (select relnamespace::regnamespace from pg_catalog.pg_class where oid = NEW.table_name);
perform add_table_to_polaris(v_namespace_name, v_table_name, NEW.metadata_location);
elsif TG_OP = 'DELETE' then
v_table_name := (select relname from pg_catalog.pg_class where oid = OLD.table_name);
v_namespace_name := (select relnamespace::regnamespace from pg_catalog.pg_class where oid = OLD.table_name);
perform remove_table_from_polaris(v_namespace_name, v_table_name);
end if;
return null;
END;
$function$;
drop trigger if exists polaris_trigger on lake_iceberg.tables_internal;
create trigger polaris_trigger
after insert or update or delete on lake_iceberg.tables_internal
for each row execute function polaris_change();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment