Last active
September 21, 2023 01:26
-
-
Save evantahler/fb247d5c51789fa8f059de708aeca520 to your computer and use it in GitHub Desktop.
users-table-type-and-dedupe-annotated
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- RAW TABLE | |
/* | |
IDEAS: | |
* Should this table be a Snowflake Stream? | |
* https://docs.snowflake.com/en/sql-reference/sql/create-stream | |
*/ | |
create or replace TABLE AIRBYTE_DEVELOP."airbyte_internal"."USERS_RAW" ( | |
"_airbyte_raw_id" VARCHAR(16777216) NOT NULL, -- Added by Airbyte, this id links the rows in these 2 tables (UUID) | |
"_airbyte_extracted_at" TIMESTAMP_TZ(9) DEFAULT CURRENT_TIMESTAMP(), | |
"_airbyte_loaded_at" TIMESTAMP_TZ(9), | |
"_airbyte_data" VARIANT, | |
primary key ("_airbyte_raw_id") | |
); | |
-- FINAL TABLE | |
create or replace TABLE AIRBYTE_DEVELOP."v2-internal-staging"."USERS_FINAL" ( | |
"_airbyte_raw_id" VARCHAR(16777216) NOT NULL, -- Added by Airbyte, this id links the rows in these 2 tables (UUID) | |
"_airbyte_extracted_at" TIMESTAMP_TZ(9) NOT NULL, | |
"_airbyte_meta" VARIANT NOT NULL, -- Contains errors (and eventually other metadata like sync id, sync time, etc) | |
"address" OBJECT, | |
"occupation" VARCHAR(16777216), | |
"gender" VARCHAR(16777216), | |
"academic_degree" VARCHAR(16777216), | |
"weight" NUMBER(38,0), | |
"created_at" TIMESTAMP_TZ(9), | |
"language" VARCHAR(16777216), | |
"telephone" VARCHAR(16777216), | |
"title" VARCHAR(16777216), | |
"updated_at" TIMESTAMP_TZ(9), | |
"nationality" VARCHAR(16777216), | |
"blood_type" VARCHAR(16777216), | |
"name" VARCHAR(16777216), | |
"id" NUMBER(38,0), | |
"age" NUMBER(38,0), | |
"email" VARCHAR(16777216), | |
"height" VARCHAR(16777216) | |
); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- This all happens within one big transaction to be idempotent | |
/* | |
General Questions: | |
* What's the best way to analyze each part of this transaction, both in cost and time? | |
*/ | |
BEGIN TRANSACTION; | |
-- PHASE 1: BASIC DATA VALIDATION | |
/* | |
This block fails the transaction if there are any records to insert that are missing a primary key within the JSON/Variant data | |
*/ | |
EXECUTE IMMEDIATE 'DECLARE _ab_missing_primary_key EXCEPTION (-20001, \'Table "airbyte_internal"."USERS_RAW" has rows missing a primary key\'); | |
BEGIN | |
LET missing_pk_count INTEGER := ( | |
SELECT COUNT(1) | |
FROM "airbyte_internal"."USERS_RAW" | |
WHERE | |
"_airbyte_loaded_at" IS NULL | |
AND TRY_CAST((get("_airbyte_data", \'id\'))::text as NUMBER) IS NULL | |
); | |
IF (missing_pk_count > 0) THEN | |
RAISE _ab_missing_primary_key; | |
END IF; | |
RETURN \'SUCCESS\'; | |
END; | |
'; | |
-- PHASE 2: TYPECASTING | |
/* | |
Of note: | |
* We use SAFE_CAST to prevent crashes when we encounter raw data that is not of the proper type (it happens more than you think), e.g. 'twenty' is provided for a number column | |
* We know that we can skip SAFE_CAST on text and JSON/VARIANT columns | |
* We SAFE_CAST twice, once to fill in the column, and a second time to build an error message for each column that has a problem. This is a DX goal for us | |
* Is there a way to avoid SAFE_CAST entirely? | |
* We also know that building the JSON array of errors is slow on `_AIRBYTE_META.errors=[]` - can we do this better? | |
* We use `WHERE "_airbyte_loaded_at" IS NULL` to only typecast and insert new records from this sync | |
IDEAS | |
* Would it be faster to build a persisted, other table and then MERGE it into the FINAL table rather than the CTE we are building here? | |
* https://docs.snowflake.com/en/sql-reference/sql/merge | |
* Can we do the date checks without regular expressions? | |
*/ | |
INSERT INTO "V2-INTERNAL-STAGING"."USERS_FINAL" | |
( | |
"ADDRESS", | |
"OCCUPATION", | |
"GENDER", | |
"ACADEMIC_DEGREE", | |
"WEIGHT", | |
"CREATED_AT", | |
"LANGUAGE", | |
"TELEPHONE", | |
"TITLE", | |
"UPDATED_AT", | |
"NATIONALITY", | |
"BLOOD_TYPE", | |
"NAME", | |
"ID", | |
"AGE", | |
"EMAIL", | |
"HEIGHT", | |
"_AIRBYTE_META", | |
"_AIRBYTE_RAW_ID", | |
"_AIRBYTE_EXTRACTED_AT" | |
) | |
WITH intermediate_data AS ( | |
-- here's where we do the typecasting | |
SELECT | |
CASE | |
WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT' | |
THEN NULL | |
ELSE "_airbyte_data":"address" | |
END | |
as "ADDRESS", | |
(("_airbyte_data":"occupation")::text) as "OCCUPATION", | |
(("_airbyte_data":"gender")::text) as "GENDER", | |
(("_airbyte_data":"academic_degree")::text) as "ACADEMIC_DEGREE", | |
TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) as "WEIGHT", | |
CASE | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH') | |
ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ) | |
END | |
as "CREATED_AT", | |
(("_airbyte_data":"language")::text) as "LANGUAGE", | |
(("_airbyte_data":"telephone")::text) as "TELEPHONE", | |
(("_airbyte_data":"title")::text) as "TITLE", | |
CASE | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH') | |
ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ) | |
END | |
as "UPDATED_AT", | |
(("_airbyte_data":"nationality")::text) as "NATIONALITY", | |
(("_airbyte_data":"blood_type")::text) as "BLOOD_TYPE", | |
(("_airbyte_data":"name")::text) as "NAME", | |
TRY_CAST(("_airbyte_data":"id")::text as NUMBER) as "ID", | |
TRY_CAST(("_airbyte_data":"age")::text as NUMBER) as "AGE", | |
(("_airbyte_data":"email")::text) as "EMAIL", | |
(("_airbyte_data":"height")::text) as "HEIGHT", | |
-- build the error array | |
ARRAY_CONSTRUCT_COMPACT(CASE | |
WHEN (TYPEOF("_airbyte_data":"address") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (CASE | |
WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT' | |
THEN NULL | |
ELSE "_airbyte_data":"address" | |
END | |
IS NULL) | |
THEN 'Problem with `address`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"occupation") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"occupation")::text) IS NULL) | |
THEN 'Problem with `occupation`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"gender") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"gender")::text) IS NULL) | |
THEN 'Problem with `gender`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"academic_degree") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"academic_degree")::text) IS NULL) | |
THEN 'Problem with `academic_degree`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"weight") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) IS NULL) | |
THEN 'Problem with `weight`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"created_at") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (CASE | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM') | |
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH') | |
ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ) | |
END | |
IS NULL) | |
THEN 'Problem with `created_at`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"language") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"language")::text) IS NULL) | |
THEN 'Problem with `language`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"telephone") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"telephone")::text) IS NULL) | |
THEN 'Problem with `telephone`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"title") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"title")::text) IS NULL) | |
THEN 'Problem with `title`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"updated_at") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (CASE | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM') | |
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}' | |
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH') | |
ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ) | |
END | |
IS NULL) | |
THEN 'Problem with `updated_at`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"nationality") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"nationality")::text) IS NULL) | |
THEN 'Problem with `nationality`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"blood_type") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"blood_type")::text) IS NULL) | |
THEN 'Problem with `blood_type`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"name") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"name")::text) IS NULL) | |
THEN 'Problem with `name`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"id") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (TRY_CAST(("_airbyte_data":"id")::text as NUMBER) IS NULL) | |
THEN 'Problem with `id`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"age") NOT IN ('NULL', 'NULL_VALUE')) | |
AND (TRY_CAST(("_airbyte_data":"age")::text as NUMBER) IS NULL) | |
THEN 'Problem with `age`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"email") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"email")::text) IS NULL) | |
THEN 'Problem with `email`' | |
ELSE NULL | |
END, | |
CASE | |
WHEN (TYPEOF("_airbyte_data":"height") NOT IN ('NULL', 'NULL_VALUE')) | |
AND ((("_airbyte_data":"height")::text) IS NULL) | |
THEN 'Problem with `height`' | |
ELSE NULL | |
END) as "_airbyte_cast_errors", | |
-- error building complete | |
"_airbyte_raw_id", | |
"_airbyte_extracted_at" | |
FROM "airbyte_internal"."USERS_RAW" | |
-- basic filtering here | |
WHERE | |
"_airbyte_loaded_at" IS NULL | |
) | |
SELECT | |
"ADDRESS", | |
"OCCUPATION", | |
"GENDER", | |
"ACADEMIC_DEGREE", | |
"WEIGHT", | |
"CREATED_AT", | |
"LANGUAGE", | |
"TELEPHONE", | |
"TITLE", | |
"UPDATED_AT", | |
"NATIONALITY", | |
"BLOOD_TYPE", | |
"NAME", | |
"ID", | |
"AGE", | |
"EMAIL", | |
"HEIGHT", | |
OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META", -- is this the best way to build the meta object? | |
"_airbyte_raw_id" AS "_AIRBYTE_RAW_ID", | |
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT" | |
FROM intermediate_data; | |
-- PART 3: DEDUPLICATION | |
/* | |
* Once we have the new data mixed in with the previous data in the final table, we want to deduplicate. We delete all but the newest records grouped by PK. As an update might happen for any record already in the table, we cannot time-bound this query by the cursor or _airbyte_extracted_at. | |
IDEAS | |
* The sub-select can probably be improved. We cannot use the RAW table at this time, because it also still has duplicate entries. | |
*/ | |
DELETE FROM "V2-INTERNAL-STAGING"."USERS_FINAL" | |
WHERE "_AIRBYTE_RAW_ID" IN ( | |
SELECT "_AIRBYTE_RAW_ID" FROM ( | |
SELECT "_AIRBYTE_RAW_ID", row_number() OVER ( | |
PARTITION BY "ID" ORDER BY "UPDATED_AT" DESC NULLS LAST, "_AIRBYTE_EXTRACTED_AT" DESC | |
) as row_number FROM "V2-INTERNAL-STAGING"."USERS_FINAL" | |
) | |
WHERE row_number != 1 | |
); | |
-- PART 4: DEDUPLICATION OF RAW TABLE | |
/* | |
Now that the final table has been de-duplicated, we can use the _airbyte_raw_id's to de-duplicate the raw table as well. | |
IDEAS | |
* Can this query be combined with the final statement in some way? | |
*/ | |
DELETE FROM "airbyte_internal"."USERS_RAW" | |
WHERE "_airbyte_raw_id" NOT IN ( | |
SELECT "_AIRBYTE_RAW_ID" FROM "V2-INTERNAL-STAGING"."USERS_FINAL" | |
); | |
-- PART 5: UPDATE _AIRBYTE_LOADED_AT | |
/* | |
* Finally, we update the _airbyte_loaded_at timestamp in the raw tables to indicate that these rows have already been processed. | |
*/ | |
UPDATE "airbyte_internal"."USERS_RAW" | |
SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP() | |
WHERE "_airbyte_loaded_at" IS NULL | |
; | |
-- commit the transaction | |
COMMIT; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment