Skip to content

Instantly share code, notes, and snippets.

@heri16
Last active May 24, 2024 15:28
Show Gist options
  • Save heri16/8e4a6e7fa83d0ec44d2c35badfd0a954 to your computer and use it in GitHub Desktop.
Save heri16/8e4a6e7fa83d0ec44d2c35badfd0a954 to your computer and use it in GitHub Desktop.
Secure Number Masking for Postgres (Scrambling DB Primary Keys or sensitive data using NIST FF1 Format Preserving Encryption)

What

This is open-source code that lets you secure or mask numbers (within Postgresql DB) for use as unique IDs that are 6-digits or more.

This is what they look like:

https://example.com/order/053124

Sample

Why

Many applications require numerical IDs or unique numbers. Some Examples:

  • Credit Card numbers
  • National ID Card numbers or Social Security numbers
  • Phone numbers
  • Prepaid topup voucher codes
  • Shopping coupon codes
  • User ID numbers
  • Order numbers
  • Invoice numbers
  • Transaction numbers

Using incrementing numbers for any of the above examples may expose your application to various security problems. Your IDs might expose more information than you might realize. For instance in a web shop when you make an order you will probably be redirected to a success site with your order_id found as a query parameter (or similar):

https://myshop.com/account/orders?orderid=7865

From this you can probably estimate how many orders they processed. But it gets worse. If you make another other one, let's say 14 days later, and it gets the ID 7921 you can deduce that they receive about 4 orders a day.

Typically, one could use gen_random_number() to mask such order numbers, but the result is not collision-resistant, meaning generated numbers are not guaranteed to be truly unique. Prepending a numerical timestamp could be one way to avoid collision (just like UUID), but that would make the ID way too long for use as short numerical codes (e.g. for input over Phone Numeric Keypad).

Password hashing KDF functions such as Argon2id, scrypt, bcrpyt do not generate unique numbers, which means their output cannot be used for numerical IDs.

Some may propose using Non-cryptographic Hash Function such as FNV or Murmur that can produce short 32-bit numbers. However, because the inputs are inherently small numbers, hackers can use Rainbow Tables to crack such hashes. Also because they are not keyed, more modern hash functions that better resist Collision attacks should be preferred.

SipHash is one such Non-cryptographic Hash Function that is used by Bitcoin for short transaction IDs. It protects against Denial-of-Service attacks via Hash Flooding. However, it produces 64-bit numbers (e.g. 18_446744_073709_551615) that are too long.

How it works

In cryptography, Format-preserving encryption (FPE), refers to encrypting in such a way that the output (the ciphertext) is in the same format as the input (the plaintext). It is able to take in unique inputs (numbers) and produce unique outputs (numbers).

NIST's FF1 standard is the most popular FPE standard as of this writing (and is secure as long as the tweak is suitably selected or not user-determined or provided by the user).

A 128-bit or 256-bit secret key (derived from your Postgres DB root keys) is used to convert a unique int4/int8 number to a secure numerical text (also unique). If the number is less than 6 digits, it is padded to 6 digits (e.g. 003124) to protect against FPE's security weakness on small domains.

NIST Test vectors are provided to ensure full compliance with the NIST FPE-FF1 Standard. Parts of this code is based on FPE-FF1 noble-ciphers implementation by the renowned Paul Miler.

Performance

Performance is good enough. If alphanumeric IDs are ok for your use-case, you should consider using the simpler gen_short_code() alternative here instead.

Usage

A. Generated Column

If you would like to store the generated code, use Postgres' generated-column:

-- Create a table
CREATE TABLE profile (
    id int4 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
    public_id text GENERATED ALWAYS AS (gen_mask_code(id, gen_mask_secret('_profile'))) STORED,
    ...
);

-- (optional) add an index on the generated column to speed up the query
CREATE INDEX profile_public_id_idx ON profile (public_id text_pattern_ops) WITH (fillfactor = 50);

B. Computed Column

If you don't want to store the generated short codes on-disk, use Postgres' computed-column:

-- Create a table
CREATE TABLE profile (
    id int4 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
    ...
);

-- Create a computed field
CREATE FUNCTION public_id(profile)
RETURNS text AS $$
  SELECT gen_mask_code($1.id, gen_mask_secret('_profile'));
$$ LANGUAGE SQL;

-- Query the computed field
SELECT *, profile.public_id FROM profile LIMIT 100;

-- (optional) add an index on the computed field to speed up the query
CREATE INDEX profile_public_id_idx ON profile (public_id(profile) text_pattern_ops) WITH (fillfactor = 50);

Tested on

  • Supabase
  • Postgres v15
  • AWS Relational Database Service (RDS)

Inspired By

CREATE OR REPLACE FUNCTION real_mod(a anycompatiblenonarray, b anycompatiblenonarray)
RETURNS anycompatiblenonarray AS $$
SELECT mod(b + mod(a, b), b);
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION to_bytea(value anycompatiblenonarray)
RETURNS bytea AS $$
WITH RECURSIVE builder AS (
SELECT
0 AS iteration,
value AS current_val,
(value % 256)::integer AS byte_val,
(CASE WHEN value = 0 THEN '\x00'::bytea ELSE null::bytea END) AS byte_acc
UNION ALL
SELECT
iteration + 1 AS iteration,
((current_val - byte_val) / 256) AS current_val,
(((current_val - byte_val) / 256) % 256)::integer AS byte_val,
set_byte('\x00'::bytea, 0, byte_val) AS byte_acc
FROM builder
WHERE current_val > 0
)
SELECT string_agg(byte_acc, null::bytea ORDER BY iteration DESC) FROM builder;
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION to_numeric(data bytea, radix integer DEFAULT 256)
RETURNS numeric AS $$
WITH RECURSIVE builder AS (
SELECT
0::numeric AS acc, -- Start with zero, using numeric for arbitrary precision
0 AS idx -- Start index for the data byte-array
UNION ALL
SELECT
acc * radix + get_byte(data, idx) AS acc, -- Perform the arithmetic
idx + 1 AS idx -- Increment the index for the input byte-array
FROM builder
WHERE idx <= data.length
)
SELECT acc FROM builder LIMIT 1 OFFSET data.length;
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION zero_bits(bytes bytea)
RETURNS bytea AS $$
SELECT decode(repeat('00', bytes.length), 'hex');
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
-- Enable the "pgcrypto" extension
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA extensions;
DROP DOMAIN IF EXISTS uint2 CASCADE;
CREATE DOMAIN uint2 AS int4 CHECK(VALUE >= 0 AND VALUE < 65536);
DROP DOMAIN IF EXISTS uint4 CASCADE;
CREATE DOMAIN uint4 AS int8 CHECK(VALUE >= 0 AND VALUE < 4294967296);
DROP TYPE IF EXISTS ff1_round_state CASCADE;
CREATE TYPE ff1_round_state AS (
PQ bytea,
radix uint2,
u integer,
v integer,
b integer,
d integer,
key bytea
);
CREATE OR REPLACE FUNCTION NUM_radix(radix uint2, data integer[])
RETURNS numeric AS $$
DECLARE
acc numeric = 0;
val integer;
BEGIN
-- Loop through each element in the data array
FOREACH val IN ARRAY data LOOP
acc = acc * radix + val; -- Perform the arithmetic in the specified radix
END LOOP;
RETURN acc;
END;
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION numeric_to_bytes_BE(value numeric, len integer)
RETURNS bytea AS $$
SELECT decode(repeat('00', len - b.length ), 'hex') || b FROM to_bytea(value) b;
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION bytes_to_numeric_BE(bytes bytea)
RETURNS numeric AS $$
SELECT to_numeric(bytes, 256);
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION ff1_get_round(x integer[], radix uint2, key bytea, tweak bytea DEFAULT '')
RETURNS ff1_round_state AS $$
DECLARE
n integer = array_length(x, 1);
t integer = length(tweak);
min_len uint4;
max_len uint4;
u integer;
v integer;
b integer;
d integer;
padding integer;
round ff1_round_state;
BEGIN
-- The minimum domain size for FF1 in Draft SP 800-38G Revision 1 is one million
-- radix^minlen ≥ 1000000
min_len = ceil(ln(1000000) / ln(radix)::numeric);
max_len = 2 ^ 32 - 1;
-- 2 ≤ minlen ≤ maxlen < 2^32
IF 2 > min_len OR min_len > max_len OR max_len >= 2 ^ 32 THEN
RAISE 'Invalid radix: 2 ≤ minLen ≤ maxLen < 2^32' USING ERRCODE = 'invalid_radix';
END IF;
IF n < min_len OR n > max_len THEN
RAISE 'xLen is outside minLen..maxLen bounds' USING ERRCODE = 'outside_bounds';
END IF;
-- let u = floor(n / 2).
u = trunc(n / 2);
-- let v = n – u.
v = n - u;
-- let b = ceil( ceil(v * LOG(radix)) / 8).
b = ceil(ceil(v * log(2, radix)) / 8::numeric);
-- let d = 4 * [b/4] + 4.
d = 4 * ceil(b / 4::numeric) + 4;
-- let P = [1]1 || [2]1 || [1]1 || [radix]3 || [10]1 || [u mod 256]1 || [n]4 || [t]4.
-- let Q = T || [0](−t−b−1) mod 16 || [i]1 || [NUMradix(B)]b.
-- let PQ = P || Q.
padding = real_mod(-t - b - 1, 16);
round.PQ = '\x01020100'::bytea || int2send(radix::int2) || '\x0aff'::bytea || int4send(n) || int4send(t) || tweak || decode(repeat('00', padding + 1 + b), 'hex');
round.PQ = set_byte(round.PQ, 7, u);
-- Return round vectors
round.radix = radix;
round.u = u;
round.v = v;
round.b = b;
round.d = d;
round.key = key;
RETURN round;
END;
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION ff1_do_round(
INOUT round ff1_round_state,
INOUT A integer[],
INOUT B integer[],
i integer,
decrypt boolean DEFAULT false)
RETURNS RECORD AS $$
DECLARE
BLOCK_LEN integer = 16;
PQ bytea = round.PQ;
radix uint2 = round.radix;
R bytea;
S bytea;
BEGIN
-- let Q = ... || [i]1 || [NUMradix(B)]b.
PQ = set_byte(PQ, length(PQ) - round.b - 1, i);
IF round.b != 0 THEN
PQ = overlay(PQ PLACING numeric_to_bytes_be(NUM_radix(radix, B), round.b) from (length(PQ) - round.b + 1));
END IF;
round.PQ = PQ;
-- let R = PRF(P || Q).
R = encrypt(PQ, round.key, 'aes-cbc/pad:none');
R = substring(r FROM length(R) - BLOCK_LEN + 1 for BLOCK_LEN);
-- let S be the first d bytes of the following string of ⎡d/16⎤ blocks:
-- R || CIPHK(R ⊕[1]16) || CIPHK(R ⊕[2]16) ...CIPHK(R ⊕[⎡d / 16⎤ – 1]16).
-- SELECT R || string_agg(encrypt(xor_bits(numeric_to_bytes_BE(j, 16), R), round.key, 'aes-ecb/pad:none'), null::bytea) INTO STRICT S FROM generate_series(0, ceil(round.d / 16::numeric) - 1) AS j;
-- Optimized for speed:
S := R; -- Start `S` with a copy of `R`
DECLARE
j integer = 1; -- Start loop with `j` counter as the number 1
d integer = round.d;
block bytea;
BEGIN
WHILE length(S) < d LOOP
block = numeric_to_bytes_BE(j::numeric, 16); -- Convert counter to 16-byte, big-endian format
-- XOR each byte of the block with `R`
FOR k IN 0..(BLOCK_LEN - 1) LOOP
block = set_byte(block, k, get_byte(block, k) # get_byte(R, k));
END LOOP;
S = S || encrypt(block, round.key, 'aes-ecb/pad:none'); -- Encrypt the block and append to `S`
j = j + 1; -- Increment the counter
END LOOP;
END;
DECLARE
y numeric;
c numeric;
m integer;
BEGIN
-- Convert the first d bytes of `S` to a numeric (big-endian)
-- let y = NUM(S).
y = bytes_to_numeric_BE(substring(S FROM 1 FOR round.d));
-- Reset S to zero
S = zero_bits(S);
-- If decrypt is true, negate y
IF decrypt THEN
y = -y;
END IF;
-- if i is even, let m = u; else, let m = v.
m = CASE WHEN i % 2 = 0 THEN round.u ELSE round.v END;
-- let c = (NUMradix (A) + y) mod radix^m
c = real_mod(NUM_radix(radix, A) + y, radix::numeric ^ m::numeric);
-- Reset A to zero
FOR i IN array_lower(A, 1)..array_upper(A, 1) LOOP
A[i] := 0; -- Zero each element
END LOOP;
-- let A = B.
A = B;
-- let B = C = STR(radix, m, c)
B = array_fill(0::integer, ARRAY[m]);
FOR i IN REVERSE m..1 LOOP
B[i] := (c % radix)::integer;
c = trunc(c / radix);
END LOOP;
END;
END;
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION ff1_encrypt(
x integer[],
radix integer,
key bytea,
tweak bytea DEFAULT '')
RETURNS integer[] AS $$
DECLARE
round ff1_round_state = ff1_get_round(x, radix, key, tweak);
A integer[] = x[1:round.u];
B integer[] = x[round.u+1:];
BEGIN
-- Loop to perform 10 rounds of the FF1 algorithm
FOR i IN 0..9 LOOP
SELECT out.A, out.B INTO STRICT A, B FROM ff1_do_round(round, A, B, i, false) AS out;
END LOOP;
RETURN A || B; -- Return the concatenated result of A and B
END;
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION ff1_decrypt(
x integer[],
radix integer,
key bytea,
tweak bytea DEFAULT '')
RETURNS integer[] AS $$
DECLARE
-- The FF1.Decrypt algorithm is similar to the FF1.Encrypt algorithm;
round ff1_round_state = ff1_get_round(x, radix, key, tweak);
A integer[] = x[round.u+1:];
B integer[] = x[1:round.u];
BEGIN
-- Loop to perform 10 rounds of the FF1 algorithm
FOR i IN REVERSE 9..0 LOOP
SELECT out.A, out.B INTO STRICT A, B FROM ff1_do_round(round, A, B, i, true) AS out;
END LOOP;
RETURN B || A; -- Return the concatenated result of B and A
END;
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE;
-- NIST Test Vectors for FF1 (AES-128)
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9], 10, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea) = ARRAY[2,4,3,3,4,7,7,4,8,4]) AS ok;
SELECT (ff1_decrypt(ARRAY[2,4,3,3,4,7,7,4,8,4], 10, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9]) AS ok;
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9], 10, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea, '\x39383736353433323130'::bytea) = ARRAY[6,1,2,4,2,0,0,7,7,3]) AS ok;
SELECT (ff1_decrypt(ARRAY[6,1,2,4,2,0,0,7,7,3], 10, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea, '\x39383736353433323130'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9]) AS ok;
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18], 36, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea, '\x3737373770717273373737'::bytea) = ARRAY[10,9,29,31,4,0,22,21,21,9,20,13,30,5,0,9,14,30,22]) AS ok;
SELECT (ff1_decrypt(ARRAY[10,9,29,31,4,0,22,21,21,9,20,13,30,5,0,9,14,30,22], 36, '\x2B7E151628AED2A6ABF7158809CF4F3C'::bytea, '\x3737373770717273373737'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18]) AS ok;
-- NIST Test Vectors for FF1 (AES-256)
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9], 10, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea) = ARRAY[6,6,5,7,6,6,7,0,0,9]) AS ok;
SELECT (ff1_decrypt(ARRAY[6,6,5,7,6,6,7,0,0,9], 10, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9]) AS ok;
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9], 10, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea, '\x39383736353433323130'::bytea) = ARRAY[1,0,0,1,6,2,3,4,6,3]) AS ok;
SELECT (ff1_decrypt(ARRAY[1,0,0,1,6,2,3,4,6,3], 10, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea, '\x39383736353433323130'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9]) AS ok;
SELECT (ff1_encrypt(ARRAY[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18], 36, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea, '\x3737373770717273373737'::bytea) = ARRAY[33,28,8,10,0,10,35,17,2,10,31,34,10,21,34,35,30,32,13]) AS ok;
SELECT (ff1_decrypt(ARRAY[33,28,8,10,0,10,35,17,2,10,31,34,10,21,34,35,30,32,13], 36, '\x2B7E151628AED2A6ABF7158809CF4F3CEF4359D8D580AA4F7F036D6F04FC6A94'::bytea, '\x3737373770717273373737'::bytea) = ARRAY[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18]) AS ok;
-- Enable the "pgsodium" extension
CREATE EXTENSION IF NOT EXISTS pgsodium;
-- Use a server-managed secret key (with key-rotation)
CREATE OR REPLACE FUNCTION gen_mask_secret(char(8), int8 DEFAULT 1, integer DEFAULT 16)
RETURNS bytea AS $$
-- gen_mask_secret(key_realm, rotate_idx, key_length)
-- key_length: `32` for 256-bit key
SELECT pgsodium.derive_key($2, $3, $1::bytea);
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
CREATE OR REPLACE FUNCTION gen_mask_code(numeric, bytea, integer DEFAULT 6, varchar(512) DEFAULT '')
RETURNS text AS $$
-- gen_mask_code(id, key_bytes, min_char_length, tweak_or_seed)
WITH t1(num_text) AS (
VALUES (trunc($1)::text)
), t2(num_text) AS (
SELECT (CASE WHEN length(num_text) < $3 THEN lpad(num_text, $3, '0') ELSE num_text END) FROM t1
)
SELECT array_to_string(ff1_encrypt(regexp_split_to_array(num_text, '')::integer[], 10, $2, $4::bytea), '') FROM t2 LIMIT 1;
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
-- Create a table
DROP TABLE org_invite CASCADE;
CREATE TABLE IF NOT EXISTS org_invite (
id int4 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create a computed field
CREATE OR REPLACE FUNCTION code(org_invite)
RETURNS text AS $$
SELECT gen_mask_code($1.id, gen_mask_secret('_invite_'), 6);
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE
SECURITY DEFINER SET search_path = 'extensions', 'public';
-- Generate 5,000 records
INSERT INTO org_invite (created_at) SELECT CURRENT_TIMESTAMP FROM generate_series(1, 5000);
SELECT *, org_invite.code FROM org_invite LIMIT 100;
SELECT *, org_invite.code FROM org_invite ORDER BY id DESC LIMIT 100;
-- (optional) add an index on the computed field to speed up queries
DROP INDEX IF EXISTS org_invite_code_idx;
CREATE INDEX org_invite_code_idx ON org_invite (code(org_invite) text_pattern_ops) WITH (fillfactor = 50);
-- Creating the index may take some time (as a full table scan is needed)
-- Query within 5,000 records
SELECT *, org_invite.code FROM org_invite WHERE org_invite.code LIKE '000%';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment