Last active
April 4, 2025 07:00
-
-
Save joshgontijo/fe69e0b6013bb7c007f8e67f7a1742c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package es.demo.esdemo; | |
| import jakarta.persistence.*; | |
| import java.sql.ResultSet; | |
| import java.util.Objects; | |
| @Entity | |
| @Table(name = "events") | |
| public class Event { | |
| @Id | |
| @GeneratedValue(strategy = GenerationType.IDENTITY) | |
| private long sequence; | |
| private String stream; | |
| private long version; | |
| private long timestamp; | |
| private String type; | |
| private byte[] data; | |
| public Event(long sequence, String stream, long version, long timestamp, String type, byte[] data) { | |
| this.sequence = sequence; | |
| this.stream = stream; | |
| this.version = version; | |
| this.timestamp = timestamp; | |
| this.type = type; | |
| this.data = data; | |
| } | |
| public Event() { | |
| } | |
| public static Event from(ResultSet rs) { | |
| try { | |
| return new Event( | |
| rs.getLong("sequence"), | |
| rs.getString("stream"), | |
| rs.getLong("version"), | |
| rs.getLong("timestamp"), | |
| rs.getString("event_type"), | |
| rs.getBytes("data")); | |
| } catch (Exception e) { | |
| throw new RuntimeException("Failed to read from database", e); | |
| } | |
| } | |
| public long sequence() { | |
| return sequence; | |
| } | |
| public String stream() { | |
| return stream; | |
| } | |
| public long version() { | |
| return version; | |
| } | |
| public long timestamp() { | |
| return timestamp; | |
| } | |
| public String type() { | |
| return type; | |
| } | |
| public byte[] data() { | |
| return data; | |
| } | |
| @Override | |
| public boolean equals(Object obj) { | |
| if (obj == this) return true; | |
| if (obj == null || obj.getClass() != this.getClass()) return false; | |
| var that = (Event) obj; | |
| return this.sequence == that.sequence && | |
| Objects.equals(this.stream, that.stream) && | |
| this.version == that.version && | |
| this.timestamp == that.timestamp && | |
| Objects.equals(this.type, that.type) && | |
| Objects.equals(this.data, that.data); | |
| } | |
| @Override | |
| public int hashCode() { | |
| return Objects.hash(sequence, stream, version, timestamp, type, data); | |
| } | |
| @Override | |
| public String toString() { | |
| return "Event[" + | |
| "sequence=" + sequence + ", " + | |
| "stream=" + stream + ", " + | |
| "version=" + version + ", " + | |
| "timestamp=" + timestamp + ", " + | |
| "type=" + type + ", " + | |
| "data=" + data + ']'; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CREATE TABLE IF NOT EXISTS events | |
| ( | |
| sequence INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, | |
| stream VARCHAR(500) NOT NULL, | |
| event_type VARCHAR(500) NOT NULL, | |
| version INTEGER NOT NULL, | |
| timestamp INTEGER NOT NULL, | |
| data BLOB | |
| ); | |
| create unique index events_stream_version_uindex | |
| on events (stream, version); | |
| create index events_stream_index | |
| on events (stream); | |
| create index events_event_type_index | |
| on events (event_type); | |
| create index events_timestamp_index | |
| on events (timestamp); | |
| create index events_version_index | |
| on events (version); | |
| create index events_sequence_index | |
| on events (sequence); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private static final String VERSION = "SELECT MAX(version) as version FROM streams WHERE stream = ?"; | |
| static final String INSERT = "INSERT INTO streams (stream,version,timestamp,event_type,data) VALUES (?,?,?,?,?)"; | |
| private static final String GET = "SELECT sequence,stream,version,timestamp,event_type,data FROM streams WHERE stream = ? AND version >= ? LIMIT ?"; | |
| private static final String FIND_STREAMS = "SELECT distinct(stream) FROM streams WHERE stream LIKE"; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example; | |
| import org.apache.commons.dbutils.QueryRunner; | |
| import javax.sql.DataSource; | |
| import java.io.Closeable; | |
| import java.sql.SQLException; | |
| import java.time.Duration; | |
| import java.util.ArrayDeque; | |
| import java.util.HashMap; | |
| import java.util.concurrent.ArrayBlockingQueue; | |
| import java.util.concurrent.BlockingQueue; | |
| import java.util.concurrent.CompletableFuture; | |
| import java.util.concurrent.TimeUnit; | |
| import static org.example.EventStore.INSERT; | |
| import static org.example.EventStore.NO_VERSION; | |
| public class Writer implements Closeable { | |
| private final Thread worker; | |
| private final Duration timeout; | |
| private final BlockingQueue<WriteTask> queue; | |
| private boolean stopped; | |
| private final QueryRunner db; | |
| Writer(DataSource ds, int batchSize, Duration timeout) { | |
| this.queue = new ArrayBlockingQueue<>(batchSize); | |
| this.timeout = timeout; | |
| this.db = new QueryRunner(ds); | |
| this.worker = startWorker(db, batchSize, queue); | |
| this.worker.setName("es-writer"); | |
| } | |
| public CompletableFuture<Void> enqueue(String stream, long expectedVersion, String type, byte[] data) { | |
| try { | |
| var task = new WriteTask(stream, expectedVersion, type, data); | |
| this.queue.put(task); | |
| return task.future; | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| private Thread startWorker(QueryRunner db, int batchSize, BlockingQueue<WriteTask> queue) { | |
| var worker = new Thread(() -> { | |
| var cachedVersions = new HashMap<String, Long>(); | |
| var batchStart = System.currentTimeMillis(); | |
| var batch = new ArrayDeque<WriteTask>(queue.size()); | |
| while (!stopped) { | |
| try { | |
| var item = queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); | |
| if (item != null) { | |
| var cachedVersion = cachedVersions.get(item.stream); | |
| long currVersion = cachedVersion == null ? EventStore.fetchVersion(db, item.stream) : cachedVersion; | |
| if (item.version != NO_VERSION && item.version != currVersion) { | |
| item.future.completeExceptionally(new VersionMismatch(currVersion, item.version)); | |
| } else { | |
| item.version = currVersion + 1; | |
| batch.add(item); | |
| cachedVersions.put(item.stream, item.version); | |
| } | |
| } | |
| if ((!batch.isEmpty() && (batchExpired(batchStart, timeout)) || batch.size() >= batchSize)) { | |
| //flush | |
| var flushStart = System.currentTimeMillis(); | |
| var size = batch.size(); | |
| var items = batch.stream().map(WriteTask::asParam).toArray(Object[][]::new); | |
| db.insertBatch(INSERT, rs -> null, items); | |
| WriteTask task; | |
| while ((task = batch.poll()) != null) { | |
| task.future.complete(null); | |
| } | |
| batchStart = System.currentTimeMillis(); | |
| System.out.println("Flushed " + size + " in " + (System.currentTimeMillis() - flushStart) + "ms"); | |
| } | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException(e); | |
| } catch (SQLException e) { | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| }); | |
| worker.start(); | |
| return worker; | |
| } | |
| private static boolean batchExpired(long batchStart, Duration timeout) { | |
| return System.currentTimeMillis() - batchStart > timeout.toMillis(); | |
| } | |
| @Override | |
| public void close() { | |
| try { | |
| this.stopped = true; | |
| worker.join(); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment