Skip to content

Instantly share code, notes, and snippets.

@rodydavis
Last active March 17, 2025 01:04
CRDTs in SQLite with just custom extensions
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
// Helper function to execute a SQL statement
static int execute_sql(sqlite3 *db, const char *sql) {
char *errmsg = 0;
int rc = sqlite3_exec(db, sql, 0, 0, &errmsg);
if (rc != SQLITE_OK) {
fprintf(stderr, "SQL error: %s\n", errmsg);
sqlite3_free(errmsg);
}
return rc;
}
// Helper function to check if a column exists in a table
static bool column_exists(sqlite3 *db, const char *table, const char *column) {
sqlite3_stmt *stmt;
/// concat the table name with the pragma statement
char *infoStmt = sqlite3_mprintf("PRAGMA table_info(%Q)", table);
int rc = sqlite3_prepare_v2(db, infoStmt, -1, &stmt, 0);
if (rc != SQLITE_OK) {
fprintf(stderr, "Failed to prepare statement: %s\n", sqlite3_errmsg(db));
return false;
}
sqlite3_bind_text(stmt, 1, table, -1, SQLITE_STATIC);
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
const unsigned char *col_name = sqlite3_column_text(stmt, 1);
if (col_name && strcmp((const char *)col_name, column) == 0) {
sqlite3_finalize(stmt);
return true;
}
}
sqlite3_finalize(stmt);
return false;
}
// C equivalent of _crdtDropTriggers
static void crdtDropTriggersC(sqlite3 *db, const char *table) {
char sql[512]; // Adjust size as needed
snprintf(sql, sizeof(sql),
"SELECT name FROM sqlite_master WHERE type = 'trigger' AND tbl_name = '%s';",
table);
sqlite3_stmt *stmt;
int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, 0);
if (rc != SQLITE_OK) {
fprintf(stderr, "Failed to prepare statement: %s\n", sqlite3_errmsg(db));
return;
}
const char *knownTriggers[] = { // Corrected array declaration
"crdt_insert_",
"crdt_update_",
"crdt_delete_"
};
const int numKnownTriggers = sizeof(knownTriggers) / sizeof(knownTriggers[0]);
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
const unsigned char *trigger_name = sqlite3_column_text(stmt, 0);
if (trigger_name) {
for (int i = 0; i < numKnownTriggers; ++i) {
char prefix[128]; // Adjust size as needed
snprintf(prefix, sizeof(prefix), "%s%s", knownTriggers[i], table);
if (strncmp((const char *)trigger_name, prefix, strlen(prefix)) == 0) {
char drop_sql[256]; // Adjust size as needed
snprintf(drop_sql, sizeof(drop_sql), "DROP TRIGGER IF EXISTS %s;", trigger_name);
execute_sql(db, drop_sql);
break;
}
}
}
}
sqlite3_finalize(stmt);
}
// C equivalent of _crdtTableAndTriggers
static void crdtTableAndTriggersC(sqlite3 *db, const char *table, const char *nodeId) {
sqlite3_stmt *stmt;
int rc;
// Get table info to find primary keys
char table_info_sql[128];
snprintf(table_info_sql, sizeof(table_info_sql), "PRAGMA table_info(%s);", table);
rc = sqlite3_prepare_v2(db, table_info_sql, -1, &stmt, 0);
if (rc != SQLITE_OK) {
fprintf(stderr, "Failed to prepare statement: %s\n", sqlite3_errmsg(db));
return;
}
// Collect primary keys
char primary_keys_where[512] = ""; // Adjust size as needed
char primary_keys_set[512] = ""; // Adjust size as needed
bool has_primary_key = false;
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
if (sqlite3_column_int(stmt, 5) > 0) { // 'pk' column index is 5
const unsigned char *col_name = sqlite3_column_text(stmt, 1); // 'name' column index is 1
if (col_name) {
if (has_primary_key) {
strcat(primary_keys_where, " AND ");
strcat(primary_keys_set, ", ");
}
strcat(primary_keys_where, (const char *)col_name);
strcat(primary_keys_where, " = NEW.");
strcat(primary_keys_where, (const char *)col_name);
strcat(primary_keys_set, (const char *)col_name);
strcat(primary_keys_set, " = NEW.");
strcat(primary_keys_set, (const char *)col_name);
has_primary_key = true;
}
}
}
sqlite3_finalize(stmt);
if (!has_primary_key) {
fprintf(stderr, "Table %s must have a primary key\n", table);
return;
}
// Check if table exists (already implicitly checked by pragma table_info)
// Check and add columns if they don't exist
if (!column_exists(db, table, "hlc")) {
char add_hlc_sql[256];
snprintf(add_hlc_sql, sizeof(add_hlc_sql),
"ALTER TABLE %s ADD COLUMN \"hlc\" TEXT NOT NULL DEFAULT (hlc_now('%s'));",
table, nodeId);
execute_sql(db, add_hlc_sql);
}
if (!column_exists(db, table, "modified")) {
char add_modified_sql[256];
snprintf(add_modified_sql, sizeof(add_modified_sql),
"ALTER TABLE %s ADD COLUMN \"modified\" TEXT NOT NULL DEFAULT (hlc_now('%s'));",
table, nodeId);
execute_sql(db, add_modified_sql);
}
if (!column_exists(db, table, "is_deleted")) {
char add_deleted_sql[256];
snprintf(add_deleted_sql, sizeof(add_deleted_sql),
"ALTER TABLE %s ADD COLUMN \"is_deleted\" BOOLEAN NOT NULL DEFAULT (FALSE);",
table);
execute_sql(db, add_deleted_sql);
}
if (!column_exists(db, table, "node_id")) {
char add_node_id_sql[256];
snprintf(add_node_id_sql, sizeof(add_node_id_sql),
"ALTER TABLE %s ADD COLUMN \"node_id\" TEXT NOT NULL DEFAULT ('%s');",
table, nodeId);
execute_sql(db, add_node_id_sql);
}
// Drop existing CRDT triggers
crdtDropTriggersC(db, table);
// Create before insert trigger
char before_insert_trigger_sql[2048]; // Adjust size as needed
sqlite3_str *sb = sqlite3_str_new(db);
sqlite3_str_appendf(sb, "CREATE TRIGGER crdt_insert_%q BEFORE INSERT ON %q WHEN EXISTS (SELECT 1 FROM %q WHERE %s) BEGIN UPDATE %q SET ", table, table, table, primary_keys_where, table);
sqlite3_stmt *col_stmt;
char columns_sql[128];
snprintf(columns_sql, sizeof(columns_sql), "PRAGMA table_info(%s);", table);
rc = sqlite3_prepare_v2(db, columns_sql, -1, &col_stmt, 0);
if (rc == SQLITE_OK) {
bool first = true;
while (sqlite3_step(col_stmt) == SQLITE_ROW) {
const unsigned char *col_name = sqlite3_column_text(col_stmt, 1);
const unsigned char *type = sqlite3_column_text(col_stmt, 2);
if (col_name && strcmp((const char *)col_name, "hlc") != 0 && strcmp((const char *)col_name, "modified") != 0 && strcmp((const char *)col_name, "is_deleted") != 0 && strcmp((const char *)col_name, "node_id") != 0) {
if (!first) {
sqlite3_str_append(sb, ", ", 2);
}
sqlite3_str_appendf(sb, "%q = NEW.%q", col_name, col_name);
first = false;
}
}
sqlite3_finalize(col_stmt);
}
sqlite3_str_appendf(sb, ", hlc = NEW.hlc, modified = NEW.modified, is_deleted = FALSE WHERE %s AND hlc_compare(NEW.hlc, hlc) > 0; SELECT RAISE(IGNORE); END;", primary_keys_where);
const char *insert_trigger = sqlite3_str_finish(sb);
execute_sql(db, insert_trigger);
sqlite3_free((void*)insert_trigger);
// Create after update trigger
char after_update_trigger_sql[512]; // Adjust size as needed
snprintf(after_update_trigger_sql, sizeof(after_update_trigger_sql),
"CREATE TRIGGER crdt_update_%s AFTER UPDATE ON %s BEGIN UPDATE %s SET modified = hlc_increment(NEW.modified) WHERE %s AND hlc_compare(NEW.hlc, hlc) > 0; END;",
table, table, table, primary_keys_where);
execute_sql(db, after_update_trigger_sql);
// Create before delete trigger
char before_delete_trigger_sql[512]; // Adjust size as needed
snprintf(before_delete_trigger_sql, sizeof(before_delete_trigger_sql),
"CREATE TRIGGER crdt_delete_%s BEFORE DELETE ON %s BEGIN UPDATE %s SET is_deleted = TRUE WHERE %s; SELECT RAISE(IGNORE); END;",
table, table, table, primary_keys_where);
execute_sql(db, before_delete_trigger_sql);
}
// C equivalent of sqlite3_upgrade_table_to_crdt
static void sqlite3_upgrade_table_to_crdt(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 2) {
sqlite3_result_error(context, "crdt requires exactly two arguments (table_name, node_id)", -1);
return;
}
const unsigned char *table_name = sqlite3_value_text(argv[0]);
const unsigned char *node_id = sqlite3_value_text(argv[1]);
if (table_name == NULL || node_id == NULL) {
sqlite3_result_error(context, "table_name and node_id arguments must be text values", -1);
return;
}
sqlite3 *db = sqlite3_context_db_handle(context);
char* table_name_str = (char*)table_name;
char* node_id_str = (char*)node_id;
// crdtDropTriggersC(db, table_name_str);
crdtTableAndTriggersC(db, table_name_str, node_id_str);
sqlite3_result_int(context, 0);
}
static void sqlite3_drop_triggers(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 1) {
sqlite3_result_error(context, "crdt_reset requires exactly one argument (table_name)", -1);
return;
}
const unsigned char *table_name = sqlite3_value_text(argv[0]);
if (table_name == NULL) {
sqlite3_result_error(context, "table_name argument must be a text value", -1);
return;
}
sqlite3 *db = sqlite3_context_db_handle(context);
char* table_name_str = (char*)table_name;
crdtDropTriggersC(db, table_name_str);
sqlite3_result_int(context, 0);
}
#ifdef _WIN32
__declspec(dllexport)
#endif
int sqlite3_crdt_init(
sqlite3 *db,
char **pzErrMsg,
const sqlite3_api_routines *pApi
){
int rc = SQLITE_OK;
SQLITE_EXTENSION_INIT2(pApi);
(void)pzErrMsg; /* Unused parameter */
// Upgrade table to crdt
rc = sqlite3_create_function(db, "crdt", 2, SQLITE_UTF8 | SQLITE_DETERMINISTIC | SQLITE_INNOCUOUS, NULL, sqlite3_upgrade_table_to_crdt, NULL, NULL);
if (rc != SQLITE_OK) return rc;
// Drop triggers function crdt_reset
rc = sqlite3_create_function(db, "crdt_reset", 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC | SQLITE_INNOCUOUS, NULL, sqlite3_drop_triggers, NULL, NULL);
if (rc != SQLITE_OK) return rc;
return SQLITE_OK;
}
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <inttypes.h>
#include <errno.h>
#include <assert.h>
#define MAX_COUNTER 0xFFFF
#define MAX_NODE_ID_LENGTH 64
// Represents a Duration in milliseconds
typedef int64_t Duration;
const Duration MAX_DRIFT = 60000; // 1 minute in milliseconds
// Represents the HLC structure
typedef struct {
int64_t dateTime; // UTC milliseconds since epoch
unsigned short counter;
char nodeId[MAX_NODE_ID_LENGTH];
} Hlc;
// Helper function to get current UTC time in milliseconds since epoch
static int64_t getCurrentUtcMillis() {
struct timeval tv;
gettimeofday(&tv, NULL);
return (int64_t)tv.tv_sec * 1000 + (int64_t)tv.tv_usec / 1000;
}
// Helper function to convert struct tm to UTC milliseconds since epoch
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/time.h>
#include <stddef.h>
#endif
static int64_t tmToUtcMillis(struct tm *tm) {
time_t t = mktime(tm);
if (t == -1) {
return -1; // Indicate error
}
#ifdef _WIN32
// For Windows, mktime uses local time, so we need to adjust for the timezone.
// This is a simplified approach and might not be accurate for all timezones.
SYSTEMTIME st;
TzSpecificLocalTimeToSystemTime(NULL, tm, &st);
FILETIME ft;
SystemTimeToFileTime(&st, &ft);
ULARGE_INTEGER uli;
uli.LowPart = ft.dwLowDateTime;
uli.HighPart = ft.dwHighDateTime;
return (int64_t)(uli.QuadPart / 10000) - 11644473600000LL;
#else
// For POSIX systems, mktime uses local time, so we need to convert to UTC.
time_t utc_t = timegm(tm);
if (utc_t == -1) {
return -1;
}
return (int64_t)utc_t * 1000;
#endif
}
// Helper function to convert ISO 8601 string to UTC milliseconds since epoch
static int64_t iso8601ToUtcMillis(const char *iso8601) {
struct tm tm;
if (strptime(iso8601, "%Y-%m-%dT%H:%M:%S", &tm) == NULL &&
strptime(iso8601, "%Y-%m-%dT%H:%M:%S%z", &tm) == NULL &&
strptime(iso8601, "%Y-%m-%dT%H:%M:%S.%fZ", &tm) == NULL) {
return -1;
}
// Assume UTC if no timezone information is provided
if (iso8601[strlen(iso8601) - 1] != 'Z' && strchr(iso8601, '+') == NULL && strchr(iso8601, '-') == NULL) {
tm.tm_isdst = 0; // Indicate that DST is not known, force UTC interpretation
}
return tmToUtcMillis(&tm);
}
// Constructor: Hlc(DateTime dateTime, int counter, String nodeId)
static Hlc* hlc_create(int64_t dateTimeMillis, unsigned short counter, const char* nodeId) {
if (counter > MAX_COUNTER || nodeId == NULL || strlen(nodeId) >= MAX_NODE_ID_LENGTH) {
return NULL; // Indicate error with NULL
}
Hlc* hlc = (Hlc*)malloc(sizeof(Hlc));
if (hlc == NULL) {
return NULL;
}
hlc->dateTime = dateTimeMillis;
hlc->counter = counter;
strncpy(hlc->nodeId, nodeId, MAX_NODE_ID_LENGTH - 1);
hlc->nodeId[MAX_NODE_ID_LENGTH - 1] = '\0';
return hlc;
}
// Constructor: Hlc.zero(String nodeId)
static Hlc* hlc_zero(const char* nodeId) {
struct tm epoch_tm;
memset(&epoch_tm, 0, sizeof(struct tm));
epoch_tm.tm_year = 70 - 1900; // Year since 1900
epoch_tm.tm_mon = 0; // Month (0-11)
epoch_tm.tm_mday = 1; // Day of the month (1-31)
epoch_tm.tm_hour = 0; // Hour (0-23)
epoch_tm.tm_min = 0; // Minute (0-59)
epoch_tm.tm_sec = 0; // Second (0-59)
epoch_tm.tm_isdst = 0; // Not in DST
int64_t epochMillis = tmToUtcMillis(&epoch_tm);
if (epochMillis == -1) {
return NULL; // Error converting time
}
return hlc_create(epochMillis, 0, nodeId);
}
// Constructor: Hlc.fromDate(DateTime dateTime, String nodeId)
static Hlc* hlc_fromDate(int64_t dateTimeMillis, const char* nodeId) {
return hlc_create(dateTimeMillis, 0, nodeId);
}
// Constructor: Hlc.now(String nodeId)
static Hlc* hlc_now(const char* nodeId) {
int64_t nowMillis = getCurrentUtcMillis();
return hlc_fromDate(nowMillis, nodeId);
}
// Constructor: Hlc.parse(String timestamp)
static Hlc* hlc_parse(const char* timestamp) {
if (timestamp == NULL) {
return NULL;
}
const char* lastColon = strrchr(timestamp, ':');
if (lastColon == NULL) {
return NULL;
}
const char* counterDash = strchr(lastColon, '-');
if (counterDash == NULL || counterDash == lastColon) {
return NULL;
}
const char* nodeIdDash = strchr(counterDash + 1, '-');
if (nodeIdDash == NULL || nodeIdDash == counterDash + 1) {
return NULL;
}
size_t dateTimeLen = counterDash - timestamp;
char* dateTimeStr = (char*)malloc(dateTimeLen + 1);
if (dateTimeStr == NULL) {
return NULL;
}
strncpy(dateTimeStr, timestamp, dateTimeLen);
dateTimeStr[dateTimeLen] = '\0';
char* counterStr = (char*)malloc(nodeIdDash - counterDash);
if (counterStr == NULL) {
free(dateTimeStr);
return NULL;
}
strncpy(counterStr, counterDash + 1, nodeIdDash - counterDash - 1);
counterStr[nodeIdDash - counterDash - 1] = '\0';
const char* nodeId = nodeIdDash + 1;
int64_t dateTime = iso8601ToUtcMillis(dateTimeStr);
if (dateTime == -1) {
free(dateTimeStr);
free(counterStr);
return NULL;
}
unsigned long counter_ul = strtoul(counterStr, NULL, 16);
if (counter_ul > MAX_COUNTER || errno == ERANGE) {
free(dateTimeStr);
free(counterStr);
return NULL;
}
unsigned short counter = (unsigned short)counter_ul;
if (strlen(nodeId) >= MAX_NODE_ID_LENGTH) {
free(dateTimeStr);
free(counterStr);
return NULL;
}
Hlc* hlc = hlc_create(dateTime, counter, nodeId);
free(dateTimeStr);
free(counterStr);
return hlc;
}
// Method: apply({DateTime? dateTime, int? counter, String? nodeId})
static Hlc* hlc_apply(const Hlc* hlc, int64_t dateTimeMillis, unsigned short counter, const char* nodeId) {
if (hlc == NULL) {
return NULL;
}
int64_t newDateTime = (dateTimeMillis != -1) ? dateTimeMillis : hlc->dateTime;
unsigned short newCounter = (counter != (unsigned short)-1) ? counter : hlc->counter;
const char* newNodeId = (nodeId != NULL) ? nodeId : hlc->nodeId;
if (newCounter > MAX_COUNTER || strlen(newNodeId) >= MAX_NODE_ID_LENGTH) {
return NULL;
}
Hlc* newHlc = (Hlc*)malloc(sizeof(Hlc));
if (newHlc == NULL) {
return NULL;
}
newHlc->dateTime = newDateTime;
newHlc->counter = newCounter;
strncpy(newHlc->nodeId, newNodeId, MAX_NODE_ID_LENGTH - 1);
newHlc->nodeId[MAX_NODE_ID_LENGTH - 1] = '\0';
return newHlc;
}
// Method: increment({DateTime? wallTime})
static Hlc* hlc_increment(const Hlc* hlc, int64_t wallTimeMillis) {
if (hlc == NULL) {
return NULL;
}
int64_t currentWallTime = (wallTimeMillis != -1) ? wallTimeMillis : getCurrentUtcMillis();
int64_t dateTimeNew = (currentWallTime > hlc->dateTime) ? currentWallTime : hlc->dateTime;
unsigned short counterNew = (dateTimeNew == hlc->dateTime) ? hlc->counter + 1 : 0;
if (dateTimeNew - currentWallTime > MAX_DRIFT) {
return NULL; // Clock drift
}
if (counterNew > MAX_COUNTER) {
return NULL; // Overflow
}
Hlc* newHlc = (Hlc*)malloc(sizeof(Hlc));
if (newHlc == NULL) {
return NULL;
}
newHlc->dateTime = dateTimeNew;
newHlc->counter = counterNew;
strncpy(newHlc->nodeId, hlc->nodeId, MAX_NODE_ID_LENGTH - 1);
newHlc->nodeId[MAX_NODE_ID_LENGTH - 1] = '\0';
return newHlc;
}
// Method: merge(Hlc remote, {DateTime? wallTime})
static Hlc* hlc_merge(const Hlc* local, const Hlc* remote, int64_t wallTimeMillis) {
if (local == NULL || remote == NULL) {
return NULL;
}
int64_t currentWallTime = (wallTimeMillis != -1) ? wallTimeMillis : getCurrentUtcMillis();
if (remote->dateTime < local->dateTime ||
(remote->dateTime == local->dateTime && remote->counter <= local->counter)) {
return hlc_apply(local, -1, (unsigned short)-1, NULL); // Return a copy
}
if (strcmp(local->nodeId, remote->nodeId) == 0) {
return NULL; // Duplicate node
}
if (remote->dateTime - currentWallTime > MAX_DRIFT) {
return NULL; // Remote clock drift
}
Hlc* mergedHlc = hlc_apply(remote, -1, (unsigned short)-1, local->nodeId);
if (mergedHlc == NULL) {
return NULL;
}
int64_t newDateTime = (currentWallTime > remote->dateTime) ? currentWallTime : remote->dateTime;
unsigned short newCounter = (newDateTime == remote->dateTime) ? remote->counter : 0;
Hlc* finalHlc = hlc_apply(mergedHlc, newDateTime, newCounter, local->nodeId);
free(mergedHlc);
return finalHlc;
}
// Method: toString()
static char* hlc_str(const Hlc* hlc) {
if (hlc == NULL) {
return NULL;
}
struct tm tm;
time_t t = hlc->dateTime / 1000;
#ifdef _WIN32
errno_t err = gmtime_s(&tm, &t);
if (err != 0) {
return NULL;
}
#else
if (gmtime_r(&t, &tm) == NULL) {
return NULL;
}
#endif
char dateTimeStr[32];
strftime(dateTimeStr, sizeof(dateTimeStr), "%Y-%m-%dT%H:%M:%S", &tm);
char counterStr[5];
snprintf(counterStr, sizeof(counterStr), "%04X", hlc->counter);
size_t bufferSize = strlen(dateTimeStr) + 1 + 4 + 1 + strlen(hlc->nodeId) + 1;
char* result = (char*)malloc(bufferSize);
if (result == NULL) {
return NULL;
}
snprintf(result, bufferSize, "%s-%s-%s", dateTimeStr, counterStr, hlc->nodeId);
return result;
}
// Method: compareTo(Hlc other)
static int hlc_compareTo(const Hlc* hlc1, const Hlc* hlc2) {
if (hlc1 == NULL || hlc2 == NULL) {
return 0; // Or handle error appropriately
}
if (hlc1->dateTime == hlc2->dateTime) {
if (hlc1->counter == hlc2->counter) {
return strcmp(hlc1->nodeId, hlc2->nodeId);
} else {
return (hlc1->counter < hlc2->counter) ? -1 : 1;
}
} else {
return (hlc1->dateTime < hlc2->dateTime) ? -1 : 1;
}
}
// Function to free the memory allocated for Hlc
static void hlc_free(Hlc* hlc) {
if (hlc != NULL) {
free(hlc);
}
}
// --- SQLite Function Implementations ---
static void sqlite_hlc_now(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 1) {
sqlite3_result_error(context, "hlc_now requires exactly one argument (node_id)", -1);
return;
}
const unsigned char *nodeId = sqlite3_value_text(argv[0]);
if (nodeId == NULL) {
sqlite3_result_error(context, "node_id argument must be a text value", -1);
return;
}
Hlc* hlc = hlc_now((const char*)nodeId);
if (hlc == NULL) {
sqlite3_result_error(context, "Failed to create HLC", -1);
return;
}
char* hlcStr = hlc_str(hlc);
hlc_free(hlc);
if (hlcStr == NULL) {
sqlite3_result_error(context, "Failed to convert HLC to string", -1);
return;
}
sqlite3_result_text(context, hlcStr, -1, sqlite3_free);
}
static void sqlite_hlc_parse(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 1) {
sqlite3_result_error(context, "hlc_parse requires exactly one argument (timestamp)", -1);
return;
}
const unsigned char *timestamp = sqlite3_value_text(argv[0]);
if (timestamp == NULL) {
sqlite3_result_error(context, "timestamp argument must be a text value", -1);
return;
}
Hlc* hlc = hlc_parse((const char*)timestamp);
if (hlc == NULL) {
sqlite3_result_error(context, "Failed to parse HLC string", -1);
return;
}
char* hlcStr = hlc_str(hlc);
hlc_free(hlc);
if (hlcStr == NULL) {
sqlite3_result_error(context, "Failed to convert parsed HLC to string", -1);
return;
}
sqlite3_result_text(context, hlcStr, -1, sqlite3_free);
}
static void sqlite_hlc_increment(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 1 && argc != 0) {
sqlite3_result_error(context, "hlc_increment requires zero or one argument (hlc_text)", -1);
return;
}
const unsigned char *hlcText = sqlite3_value_text(argv[0]);
if (hlcText == NULL && argc == 1) {
sqlite3_result_error(context, "hlc_text argument must be a text value", -1);
return;
}
Hlc* hlc;
if (argc == 1) {
hlc = hlc_parse((const char*)hlcText);
if (hlc == NULL) {
sqlite3_result_error(context, "Invalid HLC text provided", -1);
return;
}
} else {
sqlite3_result_error(context, "hlc_increment without argument needs the context of the current node ID, which is not yet implemented in this example.", -1);
return;
}
Hlc* incrementedHlc = hlc_increment(hlc, -1);
hlc_free(hlc);
if (incrementedHlc == NULL) {
sqlite3_result_error(context, "Failed to increment HLC (potential overflow or drift)", -1);
return;
}
char* incrementedHlcStr = hlc_str(incrementedHlc);
hlc_free(incrementedHlc);
if (incrementedHlcStr == NULL) {
sqlite3_result_error(context, "Failed to convert incremented HLC to string", -1);
return;
}
sqlite3_result_text(context, incrementedHlcStr, -1, sqlite3_free);
}
static void sqlite_hlc_merge(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 2) {
sqlite3_result_error(context, "hlc_merge requires exactly two arguments (local_hlc_text, remote_hlc_text)", -1);
return;
}
const unsigned char *localHlcText = sqlite3_value_text(argv[0]);
const unsigned char *remoteHlcText = sqlite3_value_text(argv[1]);
if (localHlcText == NULL || remoteHlcText == NULL) {
sqlite3_result_error(context, "HLC text arguments must be text values", -1);
return;
}
Hlc* localHlc = hlc_parse((const char*)localHlcText);
Hlc* remoteHlc = hlc_parse((const char*)remoteHlcText);
if (localHlc == NULL || remoteHlc == NULL) {
sqlite3_result_error(context, "Invalid HLC text provided for merging", -1);
hlc_free(localHlc);
hlc_free(remoteHlc);
return;
}
Hlc* mergedHlc = hlc_merge(localHlc, remoteHlc, -1);
hlc_free(localHlc);
hlc_free(remoteHlc);
if (mergedHlc == NULL) {
sqlite3_result_error(context, "Failed to merge HLCs (potential duplicate node or drift)", -1);
return;
}
char* mergedHlcStr = hlc_str(mergedHlc);
hlc_free(mergedHlc);
if (mergedHlcStr == NULL) {
sqlite3_result_error(context, "Failed to convert merged HLC to string", -1);
return;
}
sqlite3_result_text(context, mergedHlcStr, -1, sqlite3_free);
}
static void sqlite_hlc_str(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 1) {
sqlite3_result_error(context, "hlc_str requires exactly one argument (hlc_text)", -1);
return;
}
const unsigned char *hlcText = sqlite3_value_text(argv[0]);
if (hlcText == NULL) {
sqlite3_result_error(context, "hlc_text argument must be a text value", -1);
return;
}
sqlite3_result_text(context, (const char*)hlcText, -1, SQLITE_STATIC);
}
static void sqlite_hlc_compare(sqlite3_context *context, int argc, sqlite3_value **argv) {
if (argc != 2) {
sqlite3_result_error(context, "hlc_compare requires exactly two arguments (hlc_text1, hlc_text2)", -1);
return;
}
const unsigned char *hlcText1 = sqlite3_value_text(argv[0]);
const unsigned char *hlcText2 = sqlite3_value_text(argv[1]);
if (hlcText1 == NULL || hlcText2 == NULL) {
sqlite3_result_error(context, "HLC text arguments must be text values", -1);
return;
}
Hlc* hlc1 = hlc_parse((const char*)hlcText1);
Hlc* hlc2 = hlc_parse((const char*)hlcText2);
if (hlc1 == NULL || hlc2 == NULL) {
sqlite3_result_error(context, "Invalid HLC text provided for comparison", -1);
hlc_free(hlc1);
hlc_free(hlc2);
return;
}
int comparisonResult = hlc_compareTo(hlc1, hlc2);
hlc_free(hlc1);
hlc_free(hlc2);
sqlite3_result_int(context, comparisonResult);
}
#ifdef _WIN32
__declspec(dllexport)
#endif
int sqlite3_hlc_init(
sqlite3 *db,
char **pzErrMsg,
const sqlite3_api_routines *pApi
){
int rc = SQLITE_OK;
SQLITE_EXTENSION_INIT2(pApi);
(void)pzErrMsg; /* Unused parameter */
rc = sqlite3_create_function(db, "hlc_now", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS, NULL, sqlite_hlc_now, NULL, NULL);
if (rc != SQLITE_OK) return rc;
rc = sqlite3_create_function(db, "hlc_parse", 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC | SQLITE_INNOCUOUS, NULL, sqlite_hlc_parse, NULL, NULL);
if (rc != SQLITE_OK) return rc;
rc = sqlite3_create_function(db, "hlc_increment", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS, NULL, sqlite_hlc_increment, NULL, NULL);
if (rc != SQLITE_OK) return rc;
rc = sqlite3_create_function(db, "hlc_merge", 2, SQLITE_UTF8 | SQLITE_INNOCUOUS, NULL, sqlite_hlc_merge, NULL, NULL);
if (rc != SQLITE_OK) return rc;
rc = sqlite3_create_function(db, "hlc_str", 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC | SQLITE_INNOCUOUS, NULL, sqlite_hlc_str, NULL, NULL);
if (rc != SQLITE_OK) return rc;
rc = sqlite3_create_function(db, "hlc_compare", 2, SQLITE_UTF8 | SQLITE_DETERMINISTIC | SQLITE_INNOCUOUS, NULL, sqlite_hlc_compare, NULL, NULL);
if (rc != SQLITE_OK) return rc;
return SQLITE_OK;
}
/*
** 2019-10-23
**
** The author disclaims copyright to this source code. In place of
** a legal notice, here is a blessing:
**
** May you do good and not evil.
** May you find forgiveness for yourself and forgive others.
** May you share freely, never taking more than you give.
**
******************************************************************************
**
** This SQLite extension implements functions that handling RFC-4122 UUIDs
** Three SQL functions are implemented:
**
** uuid() - generate a version 4 UUID as a string
** uuid_str(X) - convert a UUID X into a well-formed UUID string
** uuid_blob(X) - convert a UUID X into a 16-byte blob
**
** The output from uuid() and uuid_str(X) are always well-formed RFC-4122
** UUID strings in this format:
**
** xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx
**
** All of the 'x', 'M', and 'N' values are lower-case hexadecimal digits.
** The M digit indicates the "version". For uuid()-generated UUIDs, the
** version is always "4" (a random UUID). The upper three bits of N digit
** are the "variant". This library only supports variant 1 (indicated
** by values of N between '8' and 'b') as those are overwhelming the most
** common. Other variants are for legacy compatibility only.
**
** The output of uuid_blob(X) is always a 16-byte blob. The UUID input
** string is converted in network byte order (big-endian) in accordance
** with RFC-4122 specifications for variant-1 UUIDs. Note that network
** byte order is *always* used, even if the input self-identifies as a
** variant-2 UUID.
**
** The input X to the uuid_str() and uuid_blob() functions can be either
** a string or a BLOB. If it is a BLOB it must be exactly 16 bytes in
** length or else a NULL is returned. If the input is a string it must
** consist of 32 hexadecimal digits, upper or lower case, optionally
** surrounded by {...} and with optional "-" characters interposed in the
** middle. The flexibility of input is inspired by the PostgreSQL
** implementation of UUID functions that accept in all of the following
** formats:
**
** A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11
** {a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}
** a0eebc999c0b4ef8bb6d6bb9bd380a11
** a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a11
** {a0eebc99-9c0b4ef8-bb6d6bb9-bd380a11}
**
** If any of the above inputs are passed into uuid_str(), the output will
** always be in the canonical RFC-4122 format:
**
** a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11
**
** If the X input string has too few or too many digits or contains
** stray characters other than {, }, or -, then NULL is returned.
*/
#include "sqlite3ext.h"
SQLITE_EXTENSION_INIT1
#include <assert.h>
#include <string.h>
#include <ctype.h>
#if !defined(SQLITE_ASCII) && !defined(SQLITE_EBCDIC)
# define SQLITE_ASCII 1
#endif
/*
** Translate a single byte of Hex into an integer.
** This routine only works if h really is a valid hexadecimal
** character: 0..9a..fA..F
*/
static unsigned char sqlite3UuidHexToInt(int h){
assert( (h>='0' && h<='9') || (h>='a' && h<='f') || (h>='A' && h<='F') );
#ifdef SQLITE_ASCII
h += 9*(1&(h>>6));
#endif
#ifdef SQLITE_EBCDIC
h += 9*(1&~(h>>4));
#endif
return (unsigned char)(h & 0xf);
}
/*
** Convert a 16-byte BLOB into a well-formed RFC-4122 UUID. The output
** buffer zStr should be at least 37 bytes in length. The output will
** be zero-terminated.
*/
static void sqlite3UuidBlobToStr(
const unsigned char *aBlob, /* Input blob */
unsigned char *zStr /* Write the answer here */
){
static const char zDigits[] = "0123456789abcdef";
int i, k;
unsigned char x;
k = 0;
for(i=0, k=0x550; i<16; i++, k=k>>1){
if( k&1 ){
zStr[0] = '-';
zStr++;
}
x = aBlob[i];
zStr[0] = zDigits[x>>4];
zStr[1] = zDigits[x&0xf];
zStr += 2;
}
*zStr = 0;
}
/*
** Attempt to parse a zero-terminated input string zStr into a binary
** UUID. Return 0 on success, or non-zero if the input string is not
** parsable.
*/
static int sqlite3UuidStrToBlob(
const unsigned char *zStr, /* Input string */
unsigned char *aBlob /* Write results here */
){
int i;
if( zStr[0]=='{' ) zStr++;
for(i=0; i<16; i++){
if( zStr[0]=='-' ) zStr++;
if( isxdigit(zStr[0]) && isxdigit(zStr[1]) ){
aBlob[i] = (sqlite3UuidHexToInt(zStr[0])<<4)
+ sqlite3UuidHexToInt(zStr[1]);
zStr += 2;
}else{
return 1;
}
}
if( zStr[0]=='}' ) zStr++;
return zStr[0]!=0;
}
/*
** Render sqlite3_value pIn as a 16-byte UUID blob. Return a pointer
** to the blob, or NULL if the input is not well-formed.
*/
static const unsigned char *sqlite3UuidInputToBlob(
sqlite3_value *pIn, /* Input text */
unsigned char *pBuf /* output buffer */
){
switch( sqlite3_value_type(pIn) ){
case SQLITE_TEXT: {
const unsigned char *z = sqlite3_value_text(pIn);
if( sqlite3UuidStrToBlob(z, pBuf) ) return 0;
return pBuf;
}
case SQLITE_BLOB: {
int n = sqlite3_value_bytes(pIn);
return n==16 ? sqlite3_value_blob(pIn) : 0;
}
default: {
return 0;
}
}
}
/* Implementation of uuid() */
static void sqlite3UuidFunc(
sqlite3_context *context,
int argc,
sqlite3_value **argv
){
unsigned char aBlob[16];
unsigned char zStr[37];
(void)argc;
(void)argv;
sqlite3_randomness(16, aBlob);
aBlob[6] = (aBlob[6]&0x0f) + 0x40;
aBlob[8] = (aBlob[8]&0x3f) + 0x80;
sqlite3UuidBlobToStr(aBlob, zStr);
sqlite3_result_text(context, (char*)zStr, 36, SQLITE_TRANSIENT);
}
/* Implementation of uuid_str() */
static void sqlite3UuidStrFunc(
sqlite3_context *context,
int argc,
sqlite3_value **argv
){
unsigned char aBlob[16];
unsigned char zStr[37];
const unsigned char *pBlob;
(void)argc;
pBlob = sqlite3UuidInputToBlob(argv[0], aBlob);
if( pBlob==0 ) return;
sqlite3UuidBlobToStr(pBlob, zStr);
sqlite3_result_text(context, (char*)zStr, 36, SQLITE_TRANSIENT);
}
/* Implementation of uuid_blob() */
static void sqlite3UuidBlobFunc(
sqlite3_context *context,
int argc,
sqlite3_value **argv
){
unsigned char aBlob[16];
const unsigned char *pBlob;
(void)argc;
pBlob = sqlite3UuidInputToBlob(argv[0], aBlob);
if( pBlob==0 ) return;
sqlite3_result_blob(context, pBlob, 16, SQLITE_TRANSIENT);
}
#ifdef _WIN32
__declspec(dllexport)
#endif
int sqlite3_uuid_init(
sqlite3 *db,
char **pzErrMsg,
const sqlite3_api_routines *pApi
){
int rc = SQLITE_OK;
SQLITE_EXTENSION_INIT2(pApi);
(void)pzErrMsg; /* Unused parameter */
rc = sqlite3_create_function(db, "uuid", 0, SQLITE_UTF8|SQLITE_INNOCUOUS, 0,
sqlite3UuidFunc, 0, 0);
if( rc==SQLITE_OK ){
rc = sqlite3_create_function(db, "uuid_str", 1,
SQLITE_UTF8|SQLITE_INNOCUOUS|SQLITE_DETERMINISTIC,
0, sqlite3UuidStrFunc, 0, 0);
}
if( rc==SQLITE_OK ){
rc = sqlite3_create_function(db, "uuid_blob", 1,
SQLITE_UTF8|SQLITE_INNOCUOUS|SQLITE_DETERMINISTIC,
0, sqlite3UuidBlobFunc, 0, 0);
}
return rc;
}
@rodydavis
Copy link
Author

rodydavis commented Mar 15, 2025

First load the extensions (CLI for example):

sqlite3
.load uuid
.load hlc
.load crdt

Create a table:

CREATE TABLE artists (
    id INTEGER NOT NULL PRIMARY KEY,
    name TEXT NOT NULL DEFAULT ''
);

Upgrade to CRDT table:

SELECT crdt('artists', uuid());

Add a new row:

INSERT INTO artists (id, name)
VALUES (1, 'The Beatles');

Return the rows with the correct hlc timestamps:

SELECT * FROM artists
WHERE is_deleted = FALSE;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment