Skip to content

Instantly share code, notes, and snippets.

@joshgontijo
Last active April 4, 2025 07:00
Show Gist options
  • Select an option

  • Save joshgontijo/fe69e0b6013bb7c007f8e67f7a1742c0 to your computer and use it in GitHub Desktop.

Select an option

Save joshgontijo/fe69e0b6013bb7c007f8e67f7a1742c0 to your computer and use it in GitHub Desktop.
package es.demo.esdemo;
import jakarta.persistence.*;
import java.sql.ResultSet;
import java.util.Objects;
@Entity
@Table(name = "events")
public class Event {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long sequence;
private String stream;
private long version;
private long timestamp;
private String type;
private byte[] data;
public Event(long sequence, String stream, long version, long timestamp, String type, byte[] data) {
this.sequence = sequence;
this.stream = stream;
this.version = version;
this.timestamp = timestamp;
this.type = type;
this.data = data;
}
public Event() {
}
public static Event from(ResultSet rs) {
try {
return new Event(
rs.getLong("sequence"),
rs.getString("stream"),
rs.getLong("version"),
rs.getLong("timestamp"),
rs.getString("event_type"),
rs.getBytes("data"));
} catch (Exception e) {
throw new RuntimeException("Failed to read from database", e);
}
}
public long sequence() {
return sequence;
}
public String stream() {
return stream;
}
public long version() {
return version;
}
public long timestamp() {
return timestamp;
}
public String type() {
return type;
}
public byte[] data() {
return data;
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (Event) obj;
return this.sequence == that.sequence &&
Objects.equals(this.stream, that.stream) &&
this.version == that.version &&
this.timestamp == that.timestamp &&
Objects.equals(this.type, that.type) &&
Objects.equals(this.data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(sequence, stream, version, timestamp, type, data);
}
@Override
public String toString() {
return "Event[" +
"sequence=" + sequence + ", " +
"stream=" + stream + ", " +
"version=" + version + ", " +
"timestamp=" + timestamp + ", " +
"type=" + type + ", " +
"data=" + data + ']';
}
}
CREATE TABLE IF NOT EXISTS events
(
sequence INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
stream VARCHAR(500) NOT NULL,
event_type VARCHAR(500) NOT NULL,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
data BLOB
);
create unique index events_stream_version_uindex
on events (stream, version);
create index events_stream_index
on events (stream);
create index events_event_type_index
on events (event_type);
create index events_timestamp_index
on events (timestamp);
create index events_version_index
on events (version);
create index events_sequence_index
on events (sequence);
private static final String VERSION = "SELECT MAX(version) as version FROM streams WHERE stream = ?";
static final String INSERT = "INSERT INTO streams (stream,version,timestamp,event_type,data) VALUES (?,?,?,?,?)";
private static final String GET = "SELECT sequence,stream,version,timestamp,event_type,data FROM streams WHERE stream = ? AND version >= ? LIMIT ?";
private static final String FIND_STREAMS = "SELECT distinct(stream) FROM streams WHERE stream LIKE";
package org.example;
import org.apache.commons.dbutils.QueryRunner;
import javax.sql.DataSource;
import java.io.Closeable;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.example.EventStore.INSERT;
import static org.example.EventStore.NO_VERSION;
public class Writer implements Closeable {
private final Thread worker;
private final Duration timeout;
private final BlockingQueue<WriteTask> queue;
private boolean stopped;
private final QueryRunner db;
Writer(DataSource ds, int batchSize, Duration timeout) {
this.queue = new ArrayBlockingQueue<>(batchSize);
this.timeout = timeout;
this.db = new QueryRunner(ds);
this.worker = startWorker(db, batchSize, queue);
this.worker.setName("es-writer");
}
public CompletableFuture<Void> enqueue(String stream, long expectedVersion, String type, byte[] data) {
try {
var task = new WriteTask(stream, expectedVersion, type, data);
this.queue.put(task);
return task.future;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private Thread startWorker(QueryRunner db, int batchSize, BlockingQueue<WriteTask> queue) {
var worker = new Thread(() -> {
var cachedVersions = new HashMap<String, Long>();
var batchStart = System.currentTimeMillis();
var batch = new ArrayDeque<WriteTask>(queue.size());
while (!stopped) {
try {
var item = queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (item != null) {
var cachedVersion = cachedVersions.get(item.stream);
long currVersion = cachedVersion == null ? EventStore.fetchVersion(db, item.stream) : cachedVersion;
if (item.version != NO_VERSION && item.version != currVersion) {
item.future.completeExceptionally(new VersionMismatch(currVersion, item.version));
} else {
item.version = currVersion + 1;
batch.add(item);
cachedVersions.put(item.stream, item.version);
}
}
if ((!batch.isEmpty() && (batchExpired(batchStart, timeout)) || batch.size() >= batchSize)) {
//flush
var flushStart = System.currentTimeMillis();
var size = batch.size();
var items = batch.stream().map(WriteTask::asParam).toArray(Object[][]::new);
db.insertBatch(INSERT, rs -> null, items);
WriteTask task;
while ((task = batch.poll()) != null) {
task.future.complete(null);
}
batchStart = System.currentTimeMillis();
System.out.println("Flushed " + size + " in " + (System.currentTimeMillis() - flushStart) + "ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
});
worker.start();
return worker;
}
private static boolean batchExpired(long batchStart, Duration timeout) {
return System.currentTimeMillis() - batchStart > timeout.toMillis();
}
@Override
public void close() {
try {
this.stopped = true;
worker.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment