Skip to content

Instantly share code, notes, and snippets.

@AlexMAS
Last active March 25, 2025 06:13
Show Gist options
  • Save AlexMAS/5d4f0655b1e9ec2dd8b83716e99ae709 to your computer and use it in GitHub Desktop.
Save AlexMAS/5d4f0655b1e9ec2dd8b83716e99ae709 to your computer and use it in GitHub Desktop.
How to create a unique integer sequence in Apache Cassandra by using the Lamport Timestamps.
package ru.mos.emias.simi.cdc.repositories;
import java.util.UUID;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*;
import static com.datastax.oss.driver.api.querybuilder.condition.Condition.column;
public class CassandraSequenceGenerator {
// CREATE TABLE counters (name text PRIMARY KEY, node uuid, value bigint)
private static final String SEQUENCE_TABLE = "counters";
private static final String NAME_COLUMN = "name";
private static final String NODE_COLUMN = "node";
private static final String VALUE_COLUMN = "value";
private final CqlSession cassandraSession;
private final UUID node;
private final long[] values;
private final PreparedStatement initializeSequence;
private final PreparedStatement incrementSequence;
private final PreparedStatement selectSequence;
private volatile boolean initialized;
private volatile long maxValue;
private volatile int offset;
public CassandraSequenceGenerator(CqlSession session, String name) {
this(session, name, 0, 1_000);
}
public CassandraSequenceGenerator(CqlSession session, String name, long initValue, int prefetchCount) {
this.cassandraSession = session;
this.node = UUID.randomUUID();
this.values = new long[prefetchCount];
this.initialized = false;
this.maxValue = initValue;
this.offset = 0;
this.initializeSequence = session.prepare(insertInto(SEQUENCE_TABLE)
.value(NAME_COLUMN, literal(name))
.value(NODE_COLUMN, literal(node))
.value(VALUE_COLUMN, literal(initValue))
.ifNotExists()
.build());
this.incrementSequence = session.prepare(update(SEQUENCE_TABLE)
.setColumn(NODE_COLUMN, literal(node))
.setColumn(VALUE_COLUMN, bindMarker())
.whereColumn(NAME_COLUMN).isEqualTo(literal(name))
.if_(column(VALUE_COLUMN).isEqualTo(bindMarker()))
.build());
this.selectSequence = session.prepare(selectFrom(SEQUENCE_TABLE)
.column(NODE_COLUMN)
.column(VALUE_COLUMN)
.whereColumn(NAME_COLUMN).isEqualTo(literal(name))
.build());
}
public synchronized long nextValue() {
var index = offset;
if (index == 0) {
nextValues();
}
if (++offset == values.length) {
offset = 0;
}
return values[index];
}
private void nextValues() {
if (!initialized) {
cassandraSession.execute(initializeSequence.bind());
initialized = true;
}
UUID actualNode;
long actualValue;
do {
cassandraSession.execute(incrementSequence.bind(maxValue + values.length, maxValue));
var row = cassandraSession.execute(selectSequence.bind()).one();
actualNode = row.getUuid(NODE_COLUMN);
actualValue = row.getLong(VALUE_COLUMN);
maxValue = actualValue;
} while (!node.equals(actualNode));
for (var i = values.length - 1; i >= 0; --i) {
values[i] = actualValue--;
}
}
}
@AlexMAS
Copy link
Author

AlexMAS commented Mar 25, 2025

Usage example:

CqlSession session;
String sequenceName
// ...

var sequence = new CassandraSequenceGenerator(session, sequenceName);
// ...

var value1 = sequence.nextValue();
var value2 = sequence.nextValue();
var value3 = sequence.nextValue();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment