-
-
Save slachiewicz/4dcafd36976af59ea886ce54c594395d to your computer and use it in GitHub Desktop.
Expose pg_lake Iceberg tables via Polaris metadata
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| create or replace function add_table_to_polaris(p_namespace_name text, p_table_name text, p_metadata_location text) | |
| returns void | |
| language plpgsql | |
| as $function$ | |
| declare | |
| v_catalog_id bigint; | |
| v_namespace_id bigint; | |
| v_table_id bigint; | |
| v_now bigint := (extract(epoch FROM clock_timestamp()) * 1000)::bigint; | |
| v_location text; | |
| begin | |
| v_location := regexp_replace(p_metadata_location, '(.*?/)(metadata/[^/]*.metadata.json)$', '\1'); | |
| /* get catalog ID (should use database name?) */ | |
| select id into v_catalog_id | |
| from polaris_schema.entities | |
| where name = current_database() and type_code = 4; | |
| if not found then | |
| /* catalog not in polaris */ | |
| return; | |
| end if; | |
| /* get namespace ID */ | |
| select id into v_namespace_id | |
| from polaris_schema.entities | |
| where name = p_namespace_name and type_code = 6 and parent_id = v_catalog_id for update; | |
| if not found then | |
| /* namespace not yet in polaris */ | |
| v_namespace_id := (random() * 9223372036854775807)::bigint; | |
| /* insert namespace */ | |
| insert into polaris_schema.entities ( | |
| realm_id, | |
| catalog_id, | |
| id, | |
| parent_id, | |
| name, | |
| entity_version, | |
| type_code, | |
| sub_type_code, | |
| create_timestamp, | |
| drop_timestamp, | |
| purge_timestamp, | |
| to_purge_timestamp, | |
| last_update_timestamp, | |
| properties, | |
| internal_properties, | |
| grant_records_version) | |
| values ( | |
| 'POLARIS', | |
| v_catalog_id, | |
| v_namespace_id, | |
| v_catalog_id, | |
| p_namespace_name, | |
| 1, /* version */ | |
| 6, /* namespace type code */ | |
| 0, /* sub type code? */ | |
| v_now, | |
| 0, /* drop timestamp */ | |
| 0, /* purge timestamp */ | |
| 0, /* to purge timestamp */ | |
| v_now, /* last update timestamp */ | |
| format('{"location":"%s"}', v_location)::jsonb, | |
| '{}'::jsonb, | |
| 1); | |
| end if; | |
| /* get table ID */ | |
| select id into v_table_id | |
| from polaris_schema.entities | |
| where name = p_table_name and type_code = 7 and parent_id = v_namespace_id for update; | |
| if not found then | |
| /* table not yet in polaris */ | |
| v_table_id := (random() * 9223372036854775807)::bigint; | |
| /* insert table */ | |
| insert into polaris_schema.entities ( | |
| realm_id, | |
| catalog_id, | |
| id, | |
| parent_id, | |
| name, | |
| entity_version, | |
| type_code, | |
| sub_type_code, | |
| create_timestamp, | |
| drop_timestamp, | |
| purge_timestamp, | |
| to_purge_timestamp, | |
| last_update_timestamp, | |
| properties, | |
| internal_properties, | |
| grant_records_version) | |
| values ( | |
| 'POLARIS', | |
| v_catalog_id, | |
| v_table_id, | |
| v_namespace_id, | |
| p_table_name, | |
| 1, /* version */ | |
| 7, /* tables are 7 */ | |
| 2, /* sub type code? */ | |
| v_now, | |
| 0, /* drop timestamp */ | |
| 0, /* purge timestamp */ | |
| 0, /* to purge timestamp */ | |
| v_now, /* last update timestamp */ | |
| format('{"location":"%s"}', v_location)::jsonb, | |
| format('{"parent-namespace":"%s","metadata-location":"%s"}', p_namespace_name, p_metadata_location)::jsonb, | |
| 1); | |
| else | |
| /* insert table */ | |
| update | |
| polaris_schema.entities e | |
| set | |
| entity_version = e.entity_version + 1, | |
| internal_properties = format('{"parent-namespace":"%s","metadata-location":"%s"}', p_namespace_name, p_metadata_location)::jsonb, | |
| last_update_timestamp = v_now | |
| where | |
| id = v_table_id; | |
| end if; | |
| end; | |
| $function$; | |
| create or replace function remove_table_from_polaris(p_namespace_name text, p_table_name text) | |
| returns void language plpgsql | |
| as $function$ | |
| declare | |
| v_catalog_id bigint; | |
| v_namespace_id bigint; | |
| begin | |
| /* get catalog ID */ | |
| select id into v_catalog_id | |
| from polaris_schema.entities | |
| where name = current_database() || '_catalog' and type_code = 4; | |
| if not found then | |
| /* catalog not in polaris */ | |
| return; | |
| end if; | |
| /* get namespace ID */ | |
| select id into v_namespace_id | |
| from polaris_schema.entities | |
| where name = p_namespace_name and type_code = 6 and parent_id = v_catalog_id; | |
| if not found then | |
| /* namespace not in polaris */ | |
| return; | |
| end if; | |
| delete from polaris_schema.entities where name = p_table_name and parent_id = v_namespace_id; | |
| end; | |
| $function$; | |
| create or replace function polaris_change() | |
| returns trigger language plpgsql | |
| as $function$ | |
| declare | |
| v_metadata jsonb; | |
| v_table_name text; | |
| v_namespace_name text; | |
| begin | |
| if TG_OP = 'INSERT' or TG_OP = 'UPDATE' then | |
| v_table_name := (select relname from pg_catalog.pg_class where oid = NEW.table_name); | |
| v_namespace_name := (select relnamespace::regnamespace from pg_catalog.pg_class where oid = NEW.table_name); | |
| perform add_table_to_polaris(v_namespace_name, v_table_name, NEW.metadata_location); | |
| elsif TG_OP = 'DELETE' then | |
| v_table_name := (select relname from pg_catalog.pg_class where oid = OLD.table_name); | |
| v_namespace_name := (select relnamespace::regnamespace from pg_catalog.pg_class where oid = OLD.table_name); | |
| perform remove_table_from_polaris(v_namespace_name, v_table_name); | |
| end if; | |
| return null; | |
| END; | |
| $function$; | |
| drop trigger if exists polaris_trigger on lake_iceberg.tables_internal; | |
| create trigger polaris_trigger | |
| after insert or update or delete on lake_iceberg.tables_internal | |
| for each row execute function polaris_change(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment