Last active
March 25, 2025 06:13
-
-
Save AlexMAS/5d4f0655b1e9ec2dd8b83716e99ae709 to your computer and use it in GitHub Desktop.
How to create a unique integer sequence in Apache Cassandra by using the Lamport Timestamps.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.mos.emias.simi.cdc.repositories; | |
import java.util.UUID; | |
import com.datastax.oss.driver.api.core.CqlSession; | |
import com.datastax.oss.driver.api.core.cql.PreparedStatement; | |
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*; | |
import static com.datastax.oss.driver.api.querybuilder.condition.Condition.column; | |
public class CassandraSequenceGenerator { | |
// CREATE TABLE counters (name text PRIMARY KEY, node uuid, value bigint) | |
private static final String SEQUENCE_TABLE = "counters"; | |
private static final String NAME_COLUMN = "name"; | |
private static final String NODE_COLUMN = "node"; | |
private static final String VALUE_COLUMN = "value"; | |
private final CqlSession cassandraSession; | |
private final UUID node; | |
private final long[] values; | |
private final PreparedStatement initializeSequence; | |
private final PreparedStatement incrementSequence; | |
private final PreparedStatement selectSequence; | |
private volatile boolean initialized; | |
private volatile long maxValue; | |
private volatile int offset; | |
public CassandraSequenceGenerator(CqlSession session, String name) { | |
this(session, name, 0, 1_000); | |
} | |
public CassandraSequenceGenerator(CqlSession session, String name, long initValue, int prefetchCount) { | |
this.cassandraSession = session; | |
this.node = UUID.randomUUID(); | |
this.values = new long[prefetchCount]; | |
this.initialized = false; | |
this.maxValue = initValue; | |
this.offset = 0; | |
this.initializeSequence = session.prepare(insertInto(SEQUENCE_TABLE) | |
.value(NAME_COLUMN, literal(name)) | |
.value(NODE_COLUMN, literal(node)) | |
.value(VALUE_COLUMN, literal(initValue)) | |
.ifNotExists() | |
.build()); | |
this.incrementSequence = session.prepare(update(SEQUENCE_TABLE) | |
.setColumn(NODE_COLUMN, literal(node)) | |
.setColumn(VALUE_COLUMN, bindMarker()) | |
.whereColumn(NAME_COLUMN).isEqualTo(literal(name)) | |
.if_(column(VALUE_COLUMN).isEqualTo(bindMarker())) | |
.build()); | |
this.selectSequence = session.prepare(selectFrom(SEQUENCE_TABLE) | |
.column(NODE_COLUMN) | |
.column(VALUE_COLUMN) | |
.whereColumn(NAME_COLUMN).isEqualTo(literal(name)) | |
.build()); | |
} | |
public synchronized long nextValue() { | |
var index = offset; | |
if (index == 0) { | |
nextValues(); | |
} | |
if (++offset == values.length) { | |
offset = 0; | |
} | |
return values[index]; | |
} | |
private void nextValues() { | |
if (!initialized) { | |
cassandraSession.execute(initializeSequence.bind()); | |
initialized = true; | |
} | |
UUID actualNode; | |
long actualValue; | |
do { | |
cassandraSession.execute(incrementSequence.bind(maxValue + values.length, maxValue)); | |
var row = cassandraSession.execute(selectSequence.bind()).one(); | |
actualNode = row.getUuid(NODE_COLUMN); | |
actualValue = row.getLong(VALUE_COLUMN); | |
maxValue = actualValue; | |
} while (!node.equals(actualNode)); | |
for (var i = values.length - 1; i >= 0; --i) { | |
values[i] = actualValue--; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage example: