Last active
March 15, 2025 08:07
-
-
Save rodydavis/b96549194d9a11a4e7a344ecaa2ea11d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:ffi'; | |
import 'dart:typed_data'; | |
import 'package:sqlite3/sqlite3.dart'; | |
import 'package:sqlite3/src/ffi/implementation.dart'; | |
import 'package:uuid/uuid.dart'; | |
import 'hlc.dart'; | |
void main() { | |
final lib = DynamicLibrary.open('cr-sqlite/core/dist/sqlite3'); | |
final sqlite3 = FfiSqlite3(lib); | |
// final sqlite3 = sqlite3Native; | |
print('Using sqlite3 ${sqlite3.version}'); | |
// Create a new in-memory database. To use a database backed by a file, you | |
// can replace this with sqlite3.open(yourFilePath). | |
final db = sqlite3.openInMemory() as DB; | |
final db2 = sqlite3.openInMemory() as DB; | |
// Create a table and insert some data | |
db.init(); | |
db2.init(); | |
var session = db.createSession(); | |
print('session: ${session.runtimeType}'); | |
// Prepare a statement to run it multiple times: | |
final stmt = db.prepare('INSERT INTO artists (name) VALUES (?)'); | |
stmt | |
..execute(['The Beatles']) | |
..execute(['Led Zeppelin']) | |
..execute(['The Who']) | |
..execute(['Nirvana']); | |
// Dispose a statement when you don't need it anymore to clean up resources. | |
stmt.dispose(); | |
var result = db.select('SELECT * FROM artists'); | |
for (final row in result) { | |
print('a: Artist[id: ${row['id']}, name: ${row['name']}]'); | |
} | |
var changeset = session.changeset(); // or patchset | |
print('changeset: ${changeset.lengthInBytes} bytes'); | |
// apply changes | |
db2.apply(changeset); | |
session.close(); | |
// query the database using a simple select statement | |
result = db2.select('SELECT * FROM artists'); | |
for (final row in result) { | |
print('b: Artist[id: ${row['id']}, name: ${row['name']}]'); | |
} | |
// You can run select statements with PreparedStatement.select, or directly | |
// on the database: | |
ResultSet resultSet = db.select('SELECT * FROM artists WHERE name LIKE ?', [ | |
'The %', | |
]); | |
// Get one | |
final Row row = resultSet.first; | |
print('x: Artist[id: ${row['id']}, name: ${row['name']}]'); | |
// delete one row | |
db.execute('DELETE FROM artists WHERE name = ?', [row['name']]); | |
resultSet = db.select( | |
'SELECT * FROM artists WHERE name LIKE ? AND id_deleted = FALSE', | |
['The %'], | |
); | |
// You can iterate on the result set in multiple ways to retrieve Row objects | |
// one by one. | |
for (final Row row in resultSet) { | |
print('c: Artist[id: ${row['id']}, name: ${row['name']}]'); | |
} | |
// Don't forget to dispose the database to avoid memory leaks | |
db2.close(); | |
db.close(); | |
} | |
extension type DB(Database db) implements Database { | |
void init() { | |
db.createFunction( | |
functionName: 'createId', | |
argumentCount: const AllowedArgumentCount(0), | |
// could use sqlite extension for uuid.c | |
function: (_) => Uuid().v4(), | |
deterministic: false, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'createHlc', | |
argumentCount: const AllowedArgumentCount(1), | |
function: (args) => Hlc.now(args.first.toString()).toString(), | |
deterministic: false, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'nodeId', | |
argumentCount: const AllowedArgumentCount(1), | |
function: (args) => Hlc.parse(args.first.toString()).nodeId, | |
deterministic: true, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'parseHlc', | |
argumentCount: const AllowedArgumentCount(1), | |
function: (args) => Hlc.parse(args.first.toString()), | |
deterministic: true, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'incrementHlc', | |
argumentCount: const AllowedArgumentCount(1), | |
function: | |
(args) => Hlc.parse(args.first.toString()).increment().toString(), | |
deterministic: true, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'compareHlc', | |
argumentCount: const AllowedArgumentCount(2), | |
function: (args) { | |
final [a, b] = args; | |
final hlcA = Hlc.parse(a.toString()); | |
final hlcB = Hlc.parse(b.toString()); | |
return hlcA.compareTo(hlcB); | |
}, | |
deterministic: true, | |
directOnly: false, | |
); | |
db.createFunction( | |
functionName: 'crdt', | |
argumentCount: const AllowedArgumentCount(1), | |
function: (args) => _crdtTableAndTriggers(args.first.toString()), | |
deterministic: false, | |
directOnly: false, | |
); | |
db.execute(''' | |
CREATE TABLE artists ( | |
id INTEGER NOT NULL PRIMARY KEY, | |
name TEXT NOT NULL DEFAULT '' | |
); | |
'''); | |
db.execute(''' | |
CREATE TABLE kv ( | |
key TEXT NOT NULL PRIMARY KEY, | |
value TEXT NOT NULL DEFAULT '' | |
); | |
'''); | |
db.execute( | |
"INSERT OR IGNORE INTO kv (key, value) VALUES ('node_id', createId());", | |
); | |
db.execute('SELECT crdt("artists");'); | |
} | |
String get nodeId { | |
return db | |
.select("SELECT value FROM kv WHERE key = 'node_id';") | |
.first['value']; | |
} | |
void _crdtTableAndTriggers(String table) { | |
// Get table info | |
final result = db.select('PRAGMA table_info($table);'); | |
final tableKeys = result | |
.where((e) => e['pk'] > 0) | |
.map((e) => e['name'] as String); | |
/// Check if table exists | |
if (result.isEmpty) { | |
throw MissingTableException(table); | |
} | |
// Check if table has columns [hlc, modified, node_id, id_deleted] | |
final columns = result.map((e) => e['name']).toList(); | |
if (!columns.contains('hlc')) { | |
db.execute( | |
"ALTER TABLE $table ADD COLUMN \"hlc\" TEXT NOT NULL DEFAULT (createHlc('$nodeId'));", | |
); | |
} | |
if (!columns.contains('modified')) { | |
db.execute( | |
"ALTER TABLE $table ADD COLUMN \"modified\" TEXT NOT NULL DEFAULT (createHlc('$nodeId'));", | |
); | |
} | |
if (!columns.contains('id_deleted')) { | |
db.execute( | |
'ALTER TABLE $table ADD COLUMN "id_deleted" BOOLEAN NOT NULL DEFAULT (FALSE);', | |
); | |
} | |
if (!columns.contains('node_id')) { | |
db.execute( | |
"ALTER TABLE $table ADD COLUMN \"node_id\" TEXT NOT NULL DEFAULT ('$nodeId');", | |
); | |
} | |
// Drop triggers if they exist | |
_crdtDropTriggers(table); | |
// trigger to get the hlc and compare it for insert | |
final beforeInsertTrigger = 'crdt_insert_$table'; | |
{ | |
final sb = StringBuffer(); | |
final columns = tableKeys.toList(); | |
const crdtColumns = ['hlc', 'modified', 'id_deleted']; | |
final columnsWithoutCrdt = columns.where((e) => !crdtColumns.contains(e)); | |
sb.$('CREATE TRIGGER $beforeInsertTrigger'); | |
sb.$(' BEFORE INSERT ON $table'); | |
sb.$( | |
' WHEN EXISTS (SELECT * FROM $table WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')})', | |
); | |
sb.$(' BEGIN'); | |
sb.$(' UPDATE $table'); | |
sb.$(' SET '); | |
for (final column in columnsWithoutCrdt) { | |
sb.$(' $column = NEW.$column,'); | |
} | |
sb.$(" hlc = NEW.hlc,"); | |
sb.$(" modified = NEW.modified,"); | |
sb.$(' id_deleted = FALSE'); | |
sb.$(' WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')}'); | |
sb.$(' AND compareHlc(NEW.hlc, hlc) > 0;'); | |
sb.$(' SELECT raise(IGNORE);'); | |
sb.$(' END;'); | |
db.execute(sb.toString()); | |
} | |
// Create before update trigger to set modified | |
final afterUpdateTrigger = 'crdt_update_$table'; | |
{ | |
final sb = StringBuffer(); | |
sb.$('CREATE TRIGGER $afterUpdateTrigger'); | |
sb.$(' AFTER UPDATE ON $table'); | |
sb.$(' BEGIN'); | |
sb.$(' UPDATE $table'); | |
sb.$(" SET modified = incrementHlc(NEW.modified)"); | |
sb.$(' WHERE ${tableKeys.map((e) => '$e = NEW.$e').join(' AND ')}'); | |
sb.$(' AND compareHlc(NEW.hlc, hlc) > 0;'); | |
sb.$(' END;'); | |
db.execute(sb.toString()); | |
} | |
// trigger to set is_deleted to false instead of deleting | |
final beforeDeleteTrigger = 'crdt_delete_$table'; | |
{ | |
final sb = StringBuffer(); | |
sb.$('CREATE TRIGGER $beforeDeleteTrigger'); | |
sb.$(' BEFORE DELETE ON $table'); | |
sb.$(' BEGIN'); | |
sb.$(' UPDATE $table'); | |
sb.$(' SET id_deleted = TRUE'); | |
sb.$(' WHERE ${tableKeys.map((e) => '$e = OLD.$e').join(' AND ')};'); | |
sb.$(' SELECT raise(IGNORE);'); | |
sb.$(' END;'); | |
db.execute(sb.toString()); | |
} | |
} | |
void _crdtDropTriggers(String table) { | |
final triggers = db.select( | |
"SELECT name FROM sqlite_master WHERE type = 'trigger' AND tbl_name = '$table';", | |
); | |
final knownTriggers = [ | |
'crdt_insert_$table', | |
'crdt_update_$table', | |
'crdt_delete_$table', | |
]; | |
if (triggers.any((e) => knownTriggers.contains(e['name']))) { | |
db.execute('DROP TRIGGER IF EXISTS crdt_insert_$table;'); | |
} | |
} | |
void apply(Uint8List bytes) { | |
db.applyChangeset(bytes); | |
} | |
void close() { | |
db.dispose(); | |
} | |
} | |
class MissingTableException implements Exception { | |
final String table; | |
MissingTableException(this.table); | |
@override | |
String toString() => 'Table $table does not exist'; | |
} | |
extension on StringBuffer { | |
void $([String val = '']) => writeln(val); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const _maxCounter = 0xFFFF; | |
const _maxDrift = Duration(minutes: 1); | |
/// A Hybrid Logical Clock implementation. | |
/// This class trades time precision for a guaranteed monotonically increasing | |
/// clock in distributed systems. | |
/// Inspiration: https://cse.buffalo.edu/tech-reports/2014-04.pdf | |
class Hlc implements Comparable<Hlc> { | |
final DateTime dateTime; | |
final int counter; | |
final String nodeId; | |
Hlc(DateTime dateTime, this.counter, this.nodeId) | |
: dateTime = dateTime.toUtc(), | |
assert(counter <= _maxCounter); | |
/// Instantiates an Hlc at the beginning of time and space: January 1, 1970. | |
Hlc.zero(String nodeId) : this(DateTime.utc(1970), 0, nodeId); | |
/// Instantiates an Hlc at [dateTime] with logical counter zero. | |
Hlc.fromDate(DateTime dateTime, String nodeId) : this(dateTime, 0, nodeId); | |
/// Instantiates an Hlc using the wall clock. | |
Hlc.now(String nodeId) : this.fromDate(DateTime.now(), nodeId); | |
/// Parse an HLC string in the format `ISO8601 date-counter-node id`. | |
factory Hlc.parse(String timestamp) { | |
final counterDash = timestamp.indexOf('-', timestamp.lastIndexOf(':')); | |
final nodeIdDash = timestamp.indexOf('-', counterDash + 1); | |
final dateTime = DateTime.parse(timestamp.substring(0, counterDash)); | |
final counter = int.parse( | |
timestamp.substring(counterDash + 1, nodeIdDash), | |
radix: 16, | |
); | |
final nodeId = timestamp.substring(nodeIdDash + 1); | |
return Hlc(dateTime, counter, nodeId); | |
} | |
/// Create a copy of this object applying the optional properties. | |
Hlc apply({DateTime? dateTime, int? counter, String? nodeId}) => Hlc( | |
dateTime ?? this.dateTime, | |
counter ?? this.counter, | |
nodeId ?? this.nodeId, | |
); | |
/// Increments the current timestamp for transmission to another system. | |
/// The local wall time will be used if [wallTime] isn't supplied. | |
Hlc increment({DateTime? wallTime}) { | |
// Retrieve the local wall time if millis is null | |
wallTime = (wallTime ?? DateTime.now()).toUtc(); | |
// Calculate the next time and counter | |
// * ensure that the logical time never goes backward | |
// * increment the counter if time does not advance | |
final dateTimeNew = wallTime.isAfter(dateTime) ? wallTime : dateTime; | |
final counterNew = dateTimeNew == dateTime ? counter + 1 : 0; | |
// Check the result for drift and counter overflow | |
if (dateTimeNew.difference(wallTime) > _maxDrift) { | |
throw ClockDriftException(dateTimeNew, wallTime); | |
} | |
if (counterNew > _maxCounter) { | |
throw OverflowException(counterNew); | |
} | |
return Hlc(dateTimeNew, counterNew, nodeId); | |
} | |
/// Compares and validates a timestamp from a remote system with the local | |
/// timestamp to preserve monotonicity. | |
/// Local wall time will be used if [wallTime] isn't supplied. | |
Hlc merge(Hlc remote, {DateTime? wallTime}) { | |
// Retrieve the local wall time if millis is null | |
wallTime = (wallTime ?? DateTime.now()).toUtc(); | |
// No need to do any more work if our date + counter is same or higher | |
if (remote.dateTime.isBefore(dateTime) || | |
(remote.dateTime.isAtSameMomentAs(dateTime) && | |
remote.counter <= counter)) | |
return this; | |
// Assert the node id | |
if (nodeId == remote.nodeId) { | |
throw DuplicateNodeException(nodeId); | |
} | |
// Assert the remote clock drift | |
if (remote.dateTime.difference(wallTime) > _maxDrift) { | |
throw ClockDriftException(remote.dateTime, wallTime); | |
} | |
return remote.apply(nodeId: nodeId); | |
} | |
/// Convenience method for easy json encoding. | |
String toJson() => toString(); | |
@override | |
String toString() => | |
'${dateTime.toIso8601String()}' | |
'-${counter.toRadixString(16).toUpperCase().padLeft(4, '0')}' | |
'-$nodeId'; | |
@override | |
int get hashCode => toString().hashCode; | |
@override | |
bool operator ==(other) => other is Hlc && compareTo(other) == 0; | |
bool operator <(other) => other is Hlc && compareTo(other) < 0; | |
bool operator <=(other) => this < other || this == other; | |
bool operator >(other) => other is Hlc && compareTo(other) > 0; | |
bool operator >=(other) => this > other || this == other; | |
@override | |
int compareTo(Hlc other) => | |
dateTime.isAtSameMomentAs(other.dateTime) | |
? counter == other.counter | |
? nodeId.compareTo(other.nodeId) | |
: counter - other.counter | |
: dateTime.compareTo(other.dateTime); | |
} | |
class ClockDriftException implements Exception { | |
final Duration drift; | |
ClockDriftException(DateTime dateTime, DateTime wallTime) | |
: drift = dateTime.difference(wallTime); | |
@override | |
String toString() => 'Clock drift of $drift ms exceeds maximum ($_maxDrift)'; | |
} | |
class OverflowException implements Exception { | |
final int counter; | |
OverflowException(this.counter); | |
@override | |
String toString() => 'Timestamp counter overflow: $counter'; | |
} | |
class DuplicateNodeException implements Exception { | |
final String nodeId; | |
DuplicateNodeException(this.nodeId); | |
@override | |
String toString() => 'Duplicate node: $nodeId'; | |
} | |
extension StringHlcX on String { | |
Hlc get toHlc => Hlc.parse(this); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment