Skip to content

Instantly share code, notes, and snippets.

@rodydavis
Last active March 15, 2025 08:07
Show Gist options
  • Save rodydavis/b96549194d9a11a4e7a344ecaa2ea11d to your computer and use it in GitHub Desktop.
Save rodydavis/b96549194d9a11a4e7a344ecaa2ea11d to your computer and use it in GitHub Desktop.
import 'dart:ffi';
import 'dart:typed_data';
import 'package:sqlite3/sqlite3.dart';
import 'package:sqlite3/src/ffi/implementation.dart';
import 'package:uuid/uuid.dart';
import 'hlc.dart';
void main() {
final lib = DynamicLibrary.open('cr-sqlite/core/dist/sqlite3');
final sqlite3 = FfiSqlite3(lib);
// final sqlite3 = sqlite3Native;
print('Using sqlite3 ${sqlite3.version}');
// Create a new in-memory database. To use a database backed by a file, you
// can replace this with sqlite3.open(yourFilePath).
final db = sqlite3.openInMemory() as DB;
final db2 = sqlite3.openInMemory() as DB;
// Create a table and insert some data
db.init();
db2.init();
var session = db.createSession();
print('session: ${session.runtimeType}');
// Prepare a statement to run it multiple times:
final stmt = db.prepare('INSERT INTO artists (name) VALUES (?)');
stmt
..execute(['The Beatles'])
..execute(['Led Zeppelin'])
..execute(['The Who'])
..execute(['Nirvana']);
// Dispose a statement when you don't need it anymore to clean up resources.
stmt.dispose();
var result = db.select('SELECT * FROM artists');
for (final row in result) {
print('a: Artist[id: ${row['id']}, name: ${row['name']}]');
}
var changeset = session.changeset(); // or patchset
print('changeset: ${changeset.lengthInBytes} bytes');
// apply changes
db2.apply(changeset);
session.close();
// query the database using a simple select statement
result = db2.select('SELECT * FROM artists');
for (final row in result) {
print('b: Artist[id: ${row['id']}, name: ${row['name']}]');
}
// You can run select statements with PreparedStatement.select, or directly
// on the database:
ResultSet resultSet = db.select('SELECT * FROM artists WHERE name LIKE ?', [
'The %',
]);
// Get one
final Row row = resultSet.first;
print('x: Artist[id: ${row['id']}, name: ${row['name']}]');
// delete one row
db.execute('DELETE FROM artists WHERE name = ?', [row['name']]);
resultSet = db.select(
'SELECT * FROM artists WHERE name LIKE ? AND id_deleted = FALSE',
['The %'],
);
// You can iterate on the result set in multiple ways to retrieve Row objects
// one by one.
for (final Row row in resultSet) {
print('c: Artist[id: ${row['id']}, name: ${row['name']}]');
}
// Don't forget to dispose the database to avoid memory leaks
db2.close();
db.close();
}
extension type DB(Database db) implements Database {
void init() {
db.createFunction(
functionName: 'createId',
argumentCount: const AllowedArgumentCount(0),
// could use sqlite extension for uuid.c
function: (_) => Uuid().v4(),
deterministic: false,
directOnly: false,
);
db.createFunction(
functionName: 'createHlc',
argumentCount: const AllowedArgumentCount(1),
function: (args) => Hlc.now(args.first.toString()).toString(),
deterministic: false,
directOnly: false,
);
db.createFunction(
functionName: 'nodeId',
argumentCount: const AllowedArgumentCount(1),
function: (args) => Hlc.parse(args.first.toString()).nodeId,
deterministic: true,
directOnly: false,
);
db.createFunction(
functionName: 'parseHlc',
argumentCount: const AllowedArgumentCount(1),
function: (args) => Hlc.parse(args.first.toString()),
deterministic: true,
directOnly: false,
);
db.createFunction(
functionName: 'incrementHlc',
argumentCount: const AllowedArgumentCount(1),
function:
(args) => Hlc.parse(args.first.toString()).increment().toString(),
deterministic: true,
directOnly: false,
);
db.createFunction(
functionName: 'compareHlc',
argumentCount: const AllowedArgumentCount(2),
function: (args) {
final [a, b] = args;
final hlcA = Hlc.parse(a.toString());
final hlcB = Hlc.parse(b.toString());
return hlcA.compareTo(hlcB);
},
deterministic: true,
directOnly: false,
);
db.createFunction(
functionName: 'crdt',
argumentCount: const AllowedArgumentCount(1),
function: (args) => _crdtTableAndTriggers(args.first.toString()),
deterministic: false,
directOnly: false,
);
db.execute('''
CREATE TABLE artists (
id INTEGER NOT NULL PRIMARY KEY,
name TEXT NOT NULL DEFAULT ''
);
''');
db.execute('''
CREATE TABLE kv (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL DEFAULT ''
);
''');
db.execute(
"INSERT OR IGNORE INTO kv (key, value) VALUES ('node_id', createId());",
);
db.execute('SELECT crdt("artists");');
}
String get nodeId {
return db
.select("SELECT value FROM kv WHERE key = 'node_id';")
.first['value'];
}
void _crdtTableAndTriggers(String table) {
// Get table info
final result = db.select('PRAGMA table_info($table);');
final tableKeys = result
.where((e) => e['pk'] > 0)
.map((e) => e['name'] as String);
/// Check if table exists
if (result.isEmpty) {
throw MissingTableException(table);
}
// Check if table has columns [hlc, modified, node_id, id_deleted]
final columns = result.map((e) => e['name']).toList();
if (!columns.contains('hlc')) {
db.execute(
"ALTER TABLE $table ADD COLUMN \"hlc\" TEXT NOT NULL DEFAULT (createHlc('$nodeId'));",
);
}
if (!columns.contains('modified')) {
db.execute(
"ALTER TABLE $table ADD COLUMN \"modified\" TEXT NOT NULL DEFAULT (createHlc('$nodeId'));",
);
}
if (!columns.contains('id_deleted')) {
db.execute(
'ALTER TABLE $table ADD COLUMN "id_deleted" BOOLEAN NOT NULL DEFAULT (FALSE);',
);
}
if (!columns.contains('node_id')) {
db.execute(
"ALTER TABLE $table ADD COLUMN \"node_id\" TEXT NOT NULL DEFAULT ('$nodeId');",
);
}
// Drop triggers if they exist
_crdtDropTriggers(table);
// trigger to get the hlc and compare it for insert
final beforeInsertTrigger = 'crdt_insert_$table';
{
final sb = StringBuffer();
final columns = tableKeys.toList();
const crdtColumns = ['hlc', 'modified', 'id_deleted'];
final columnsWithoutCrdt = columns.where((e) => !crdtColumns.contains(e));
sb.$('CREATE TRIGGER $beforeInsertTrigger');
sb.$(' BEFORE INSERT ON $table');
sb.$(
' WHEN EXISTS (SELECT * FROM $table WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')})',
);
sb.$(' BEGIN');
sb.$(' UPDATE $table');
sb.$(' SET ');
for (final column in columnsWithoutCrdt) {
sb.$(' $column = NEW.$column,');
}
sb.$(" hlc = NEW.hlc,");
sb.$(" modified = NEW.modified,");
sb.$(' id_deleted = FALSE');
sb.$(' WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')}');
sb.$(' AND compareHlc(NEW.hlc, hlc) > 0;');
sb.$(' SELECT raise(IGNORE);');
sb.$(' END;');
db.execute(sb.toString());
}
// Create before update trigger to set modified
final afterUpdateTrigger = 'crdt_update_$table';
{
final sb = StringBuffer();
sb.$('CREATE TRIGGER $afterUpdateTrigger');
sb.$(' AFTER UPDATE ON $table');
sb.$(' BEGIN');
sb.$(' UPDATE $table');
sb.$(" SET modified = incrementHlc(NEW.modified)");
sb.$(' WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')}');
sb.$(' AND compareHlc(NEW.hlc, hlc) > 0;');
sb.$(' END;');
db.execute(sb.toString());
}
// trigger to set is_deleted to false instead of deleting
final beforeDeleteTrigger = 'crdt_delete_$table';
{
final sb = StringBuffer();
sb.$('CREATE TRIGGER $beforeDeleteTrigger');
sb.$(' BEFORE DELETE ON $table');
sb.$(' BEGIN');
sb.$(' UPDATE $table');
sb.$(' SET id_deleted = TRUE');
sb.$(' WHERE ${tableKeys.map((e) => '$e = OLD.$e').join(' AND ')};');
sb.$(' SELECT raise(IGNORE);');
sb.$(' END;');
db.execute(sb.toString());
}
}
void _crdtDropTriggers(String table) {
final triggers = db.select(
"SELECT name FROM sqlite_master WHERE type = 'trigger' AND tbl_name = '$table';",
);
final knownTriggers = [
'crdt_insert_$table',
'crdt_update_$table',
'crdt_delete_$table',
];
if (triggers.any((e) => knownTriggers.contains(e['name']))) {
db.execute('DROP TRIGGER IF EXISTS crdt_insert_$table;');
}
}
void apply(Uint8List bytes) {
db.applyChangeset(bytes);
}
void close() {
db.dispose();
}
}
class MissingTableException implements Exception {
final String table;
MissingTableException(this.table);
@override
String toString() => 'Table $table does not exist';
}
extension on StringBuffer {
void $([String val = '']) => writeln(val);
}
const _maxCounter = 0xFFFF;
const _maxDrift = Duration(minutes: 1);
/// A Hybrid Logical Clock implementation.
/// This class trades time precision for a guaranteed monotonically increasing
/// clock in distributed systems.
/// Inspiration: https://cse.buffalo.edu/tech-reports/2014-04.pdf
class Hlc implements Comparable<Hlc> {
final DateTime dateTime;
final int counter;
final String nodeId;
Hlc(DateTime dateTime, this.counter, this.nodeId)
: dateTime = dateTime.toUtc(),
assert(counter <= _maxCounter);
/// Instantiates an Hlc at the beginning of time and space: January 1, 1970.
Hlc.zero(String nodeId) : this(DateTime.utc(1970), 0, nodeId);
/// Instantiates an Hlc at [dateTime] with logical counter zero.
Hlc.fromDate(DateTime dateTime, String nodeId) : this(dateTime, 0, nodeId);
/// Instantiates an Hlc using the wall clock.
Hlc.now(String nodeId) : this.fromDate(DateTime.now(), nodeId);
/// Parse an HLC string in the format `ISO8601 date-counter-node id`.
factory Hlc.parse(String timestamp) {
final counterDash = timestamp.indexOf('-', timestamp.lastIndexOf(':'));
final nodeIdDash = timestamp.indexOf('-', counterDash + 1);
final dateTime = DateTime.parse(timestamp.substring(0, counterDash));
final counter = int.parse(
timestamp.substring(counterDash + 1, nodeIdDash),
radix: 16,
);
final nodeId = timestamp.substring(nodeIdDash + 1);
return Hlc(dateTime, counter, nodeId);
}
/// Create a copy of this object applying the optional properties.
Hlc apply({DateTime? dateTime, int? counter, String? nodeId}) => Hlc(
dateTime ?? this.dateTime,
counter ?? this.counter,
nodeId ?? this.nodeId,
);
/// Increments the current timestamp for transmission to another system.
/// The local wall time will be used if [wallTime] isn't supplied.
Hlc increment({DateTime? wallTime}) {
// Retrieve the local wall time if millis is null
wallTime = (wallTime ?? DateTime.now()).toUtc();
// Calculate the next time and counter
// * ensure that the logical time never goes backward
// * increment the counter if time does not advance
final dateTimeNew = wallTime.isAfter(dateTime) ? wallTime : dateTime;
final counterNew = dateTimeNew == dateTime ? counter + 1 : 0;
// Check the result for drift and counter overflow
if (dateTimeNew.difference(wallTime) > _maxDrift) {
throw ClockDriftException(dateTimeNew, wallTime);
}
if (counterNew > _maxCounter) {
throw OverflowException(counterNew);
}
return Hlc(dateTimeNew, counterNew, nodeId);
}
/// Compares and validates a timestamp from a remote system with the local
/// timestamp to preserve monotonicity.
/// Local wall time will be used if [wallTime] isn't supplied.
Hlc merge(Hlc remote, {DateTime? wallTime}) {
// Retrieve the local wall time if millis is null
wallTime = (wallTime ?? DateTime.now()).toUtc();
// No need to do any more work if our date + counter is same or higher
if (remote.dateTime.isBefore(dateTime) ||
(remote.dateTime.isAtSameMomentAs(dateTime) &&
remote.counter <= counter))
return this;
// Assert the node id
if (nodeId == remote.nodeId) {
throw DuplicateNodeException(nodeId);
}
// Assert the remote clock drift
if (remote.dateTime.difference(wallTime) > _maxDrift) {
throw ClockDriftException(remote.dateTime, wallTime);
}
return remote.apply(nodeId: nodeId);
}
/// Convenience method for easy json encoding.
String toJson() => toString();
@override
String toString() =>
'${dateTime.toIso8601String()}'
'-${counter.toRadixString(16).toUpperCase().padLeft(4, '0')}'
'-$nodeId';
@override
int get hashCode => toString().hashCode;
@override
bool operator ==(other) => other is Hlc && compareTo(other) == 0;
bool operator <(other) => other is Hlc && compareTo(other) < 0;
bool operator <=(other) => this < other || this == other;
bool operator >(other) => other is Hlc && compareTo(other) > 0;
bool operator >=(other) => this > other || this == other;
@override
int compareTo(Hlc other) =>
dateTime.isAtSameMomentAs(other.dateTime)
? counter == other.counter
? nodeId.compareTo(other.nodeId)
: counter - other.counter
: dateTime.compareTo(other.dateTime);
}
class ClockDriftException implements Exception {
final Duration drift;
ClockDriftException(DateTime dateTime, DateTime wallTime)
: drift = dateTime.difference(wallTime);
@override
String toString() => 'Clock drift of $drift ms exceeds maximum ($_maxDrift)';
}
class OverflowException implements Exception {
final int counter;
OverflowException(this.counter);
@override
String toString() => 'Timestamp counter overflow: $counter';
}
class DuplicateNodeException implements Exception {
final String nodeId;
DuplicateNodeException(this.nodeId);
@override
String toString() => 'Duplicate node: $nodeId';
}
extension StringHlcX on String {
Hlc get toHlc => Hlc.parse(this);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment