Last active
February 15, 2021 23:19
-
-
Save gvisniuc/55c05b6b37d12b3cbb62d366afc04f4f to your computer and use it in GitHub Desktop.
Snowflake Deduplication Procedure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This procedure will take the full path of a Snowflake table (Database, Schema and Table name) and will deduplicate it using a rotation mechanism. | |
To minimize waste we first check if the count of the rows of the table is the same as the unique. | |
If not, given a table A, we create a deduplicated version A_DEDUP. | |
Table A is renamed to A_ROTATE | |
A_DEDUP is renamed to A | |
A_ROTATE is then dropped. | |
We wrap each operation in a transaction with a rollback to the initial state in case it fails. | |
Note: Make sure you use take into account ownership when executing the stored procedure. The operations will copy the grants, | |
but will not retain ownership as described here in https://docs.snowflake.com/en/sql-reference/sql/create-table.html#label-create-table-copy-grants | |
*/ | |
CREATE OR REPLACE PROCEDURE DEDUPLICATE(DATABASE varchar, SCHEMA varchar, "TABLE" varchar ) | |
RETURNS string | |
LANGUAGE JAVASCRIPT | |
AS | |
$$ | |
try { | |
// Declare the full path of the table | |
var full_path = `${DATABASE}.${SCHEMA}.${TABLE}` | |
// Fetch the unique count of rows | |
var uniqueStatement = snowflake.createStatement({ | |
sqlText: `SELECT COUNT(*) FROM (SELECT DISTINCT * FROM ${full_path})` | |
}); | |
var uniqueResults = uniqueStatement.execute(); | |
uniqueResults.next(); | |
var uniqueResultCount = uniqueResults.getColumnValue(1); | |
// Fetch the current count of rows | |
var countStatement = snowflake.createStatement({ | |
sqlText: `SELECT COUNT(*) FROM ${full_path}` | |
}); | |
var countResults = countStatement.execute(); | |
countResults.next(); | |
var resultCount = countResults.getColumnValue(1); | |
var resultDifference = resultCount - uniqueResultCount; | |
// If the number of rows is higher than the number of unique rows | |
// proceed with the deduplication process | |
if ( resultDifference > 0 ) { | |
try { | |
// Creates or replaces a deduplicated table | |
snowflake.execute({sqlText:`BEGIN;`}); | |
snowflake.execute({ | |
sqlText: `CREATE OR REPLACE TABLE ${full_path}_DEDUP AS ( SELECT DISTINCT * FROM ${full_path} ) COPY GRANTS` | |
}); | |
snowflake.execute( {sqlText: "COMMIT" } ); | |
} | |
catch (err) { | |
// Rollback if the creation fails | |
snowflake.execute( {sqlText: "ROLLBACK" } ); | |
return "Failed during the creation of the deduplicated table: " + err; | |
} | |
try { | |
// If it reaches this point the the table "full_path" exists and is populated | |
// Rename the "full_path" and suffix it with _ROTATE | |
// Use rename since the creation of another table would be more expensive | |
// Keep in mind that the table will not be available for a short duration | |
// between the renaming and rotation | |
snowflake.execute({sqlText:`BEGIN;`}); | |
snowflake.execute({ | |
sqlText: `ALTER TABLE ${full_path} RENAME TO ${full_path}_ROTATE` | |
}); | |
snowflake.execute( {sqlText: "COMMIT" } ); | |
} | |
catch (err) { | |
// If this tx fails rollback the rename and drop the _DEDUP table | |
snowflake.execute( {sqlText: "ROLLBACK" } ) ; | |
snowflake.execute( {sqlText: `DROP TABLE ${full_path}_DEDUP` }) ; | |
return "Failed during the renaming of the original table: " + err; | |
} | |
try { | |
// Renamed the deduplicated table to the original table name | |
snowflake.execute({sqlText:`BEGIN;`}); | |
snowflake.execute({ | |
sqlText: `ALTER TABLE ${full_path}_DEDUP RENAME TO ${full_path}` | |
}); | |
snowflake.execute( {sqlText: "COMMIT" } ); | |
} | |
catch (err) { | |
// if this tx fails rollback and rename the _ROTATE table to the original name | |
// after that drop the deduplicated table | |
snowflake.execute( {sqlText: "ROLLBACK" } ) ; | |
snowflake.execute({ | |
sqlText: `ALTER TABLE ${full_path}_ROTATE RENAME TO ${full_path}` | |
}); | |
snowflake.execute( {sqlText: `DROP TABLE ${full_path}_DEDUP` } ) ; | |
return "Failed during the renaming of the rotated table: " + err; | |
} | |
// Drop the _ROTATE table, which SHOULD exists if everything succeeded | |
snowflake.execute({ | |
sqlText: `DROP TABLE ${full_path}_ROTATE;` | |
}); | |
return `Succesfully removed ${resultDifference} duplicate${resultDifference > 1 ? 's' : ''} from ${full_path}. Initial row count is ${resultCount} and unique row count is ${uniqueResultCount}.`; | |
} else { | |
return `Table has no duplicate rows. Initial row count is ${resultCount} and unique row count is ${uniqueResultCount}.`; | |
} | |
} | |
catch (err) { | |
return "Failed during the row count check phase: " + err; | |
} | |
$$; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment