Skip to content

Instantly share code, notes, and snippets.

@joshgontijo
Created July 24, 2025 08:23
Show Gist options
  • Select an option

  • Save joshgontijo/d1ea9ad6016033ad2dfa106683b922c6 to your computer and use it in GitHub Desktop.

Select an option

Save joshgontijo/d1ea9ad6016033ad2dfa106683b922c6 to your computer and use it in GitHub Desktop.
package es.demo.esdemo.repository;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.RECORD_COMPONENT})
public @interface DomainEvent {
String value() default "";
}
package es.demo.esdemo.repository;
import jakarta.persistence.*;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Objects;
@Entity
@Table(name = "events")
public class EventRecord {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private String uuid;
@GeneratedValue
private long sequence;
private String streamId;
private long version;
@Column(insertable = false, updatable = false)
private OffsetDateTime timestamp;
@Column(name = "event_type")
private String eventType;
private byte[] data;
private byte[] metadata;
public EventRecord(String uuid, long sequence, String streamId, long version, OffsetDateTime timestamp, String eventType, byte[] data) {
this.uuid = uuid;
this.sequence = sequence;
this.streamId = streamId;
this.version = version;
this.timestamp = timestamp;
this.eventType = eventType;
this.data = data;
}
public EventRecord() {
}
public EventRecord uuid(String uuid) {
this.uuid = uuid;
return this;
}
public EventRecord sequence(long sequence) {
this.sequence = sequence;
return this;
}
public EventRecord streamId(String streamId) {
this.streamId = streamId;
return this;
}
public EventRecord version(long version) {
this.version = version;
return this;
}
public EventRecord timestamp(OffsetDateTime timestamp) {
this.timestamp = timestamp;
return this;
}
public EventRecord data(byte[] data) {
this.data = data;
return this;
}
public EventRecord metadata(byte[] metadata) {
this.metadata = metadata;
return this;
}
public String uuid() {
return uuid;
}
public long sequence() {
return sequence;
}
public String streamId() {
return streamId;
}
public long version() {
return version;
}
public OffsetDateTime timestamp() {
return timestamp;
}
public String eventType() {
return eventType;
}
public byte[] data() {
return data;
}
public byte[] metadata() {
return metadata;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
EventRecord that = (EventRecord) obj;
return sequence == that.sequence &&
version == that.version &&
Objects.equals(uuid, that.uuid) &&
Objects.equals(streamId, that.streamId) &&
Objects.equals(timestamp, that.timestamp) &&
Objects.equals(eventType, that.eventType) &&
Arrays.equals(data, that.data);
}
@Override
public int hashCode() {
int result = Objects.hash(uuid, sequence, streamId, version, timestamp, eventType);
result = 31 * result + Arrays.hashCode(data);
return result;
}
@Override
public String toString() {
return "EventRecord{" +
"uuid='" + uuid + '\'' +
", sequence=" + sequence +
", stream='" + streamId + '\'' +
", version=" + version +
", timestamp=" + timestamp +
", type='" + eventType + '\'' +
'}';
}
}
package es.demo.esdemo.repository;
import es.demo.esdemo.repo2.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@Repository
public interface EventRepository extends JpaRepository<EventRecord, Long>, JpaSpecificationExecutor<EventRecord> {
Logger log = LoggerFactory.getLogger(EventRepository.class);
long INITIAL_VERSION = 0;
default long append(EventRecord event, Version expectedVersion) {
long streamVersion = version(event.streamId()).orElse(INITIAL_VERSION);
while (true) {
var expected = switch (expectedVersion) {
case Version.Any any -> streamVersion;
case Version.Expect(long version) -> {
if (streamVersion != version) {
throw new VersionMismatch(event.streamId(), streamVersion, version);
}
yield version;
}
};
try {
event.version(expected + 1);
this.save(event);
return event.version();
} catch (DataIntegrityViolationException e) {
// This exception is thrown when the expected version does not match the DB version
if (expectedVersion instanceof Version.Expect(long version)) {
//TODO: cannot possibly know the current version.
throw new VersionMismatch(event.streamId(), -1, version, e);
}
}
}
}
//Returns the current version of the stream, or empty if the stream does not exist
@Query("SELECT MAX(e.version) FROM EventRecord e WHERE e.streamId = ?1")
Optional<Long> version(String stream);
@Transactional(readOnly = true)
default List<EventRecord> query(Specification<EventRecord> query, Sort sort, int limit) {
return this.findAll(query, PageRequest.of(0, limit, sort))
.toList();
}
class EventQuery {
public static Specification<EventRecord> eventTypes(Set<String> eventTypes) {
return (root, query, criteriaBuilder) ->
root.get("event_type").in(eventTypes);
}
public static Specification<EventRecord> timestampBetween(OffsetDateTime start, OffsetDateTime end) {
return (root, query, criteriaBuilder) ->
criteriaBuilder.between(root.get("timestamp"), start, end);
}
public static Specification<EventRecord> version(long from) {
return version(from, Long.MAX_VALUE);
}
public static Specification<EventRecord> version(long from, long to) {
return (root, query, criteriaBuilder) ->
criteriaBuilder.between(root.get("version"), from, to);
}
public static Specification<EventRecord> sequence(long from) {
return (root, query, criteriaBuilder) ->
criteriaBuilder.ge(root.get("sequence"), from);
}
public static Specification<EventRecord> sequence(long from, long to) {
return (root, query, criteriaBuilder) ->
criteriaBuilder.between(root.get("sequence"), from, to);
}
public static Specification<EventRecord> stream(String streamId) {
return (root, query, criteriaBuilder) ->
criteriaBuilder.equal(root.get("stream_id"), streamId);
}
}
}
package es.demo.esdemo.repository;
import es.demo.esdemo.Json;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.MappedHandler;
import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.invoke;
import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.mapEventHandlers;
import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.newInstance;
import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.toSnakeCase;
/**
* Typed repository for aggregates, allowing to handle events with specific methods in the aggregate class.
* The aggregate class must have a default constructor and methods annotated with @EventType.
*
* @param <T> the type of the aggregate
*/
public class TypedRepo<T extends Aggregate> {
private static final Logger log = LoggerFactory.getLogger(TypedRepo.class);
private final EventRepository db;
private final Map<String, MappedHandler> handlers;
private final Class<T> aggregateType;
public TypedRepo(EventRepository db, Class<T> aggregateType) {
this.db = db;
this.handlers = mapEventHandlers(aggregateType);
this.aggregateType = aggregateType;
}
public WriteResult append(String stream, Object event, int expectedVersion) {
var className = event.getClass().getSimpleName();
var evType = event.getClass().getAnnotation(DomainEvent.class);
if (evType == null) {
throw new IllegalArgumentException("Event class " + className + " is not annotated with " + DomainEvent.class.getSimpleName());
}
var evTypeName = evType.value() != null && !evType.value().isBlank() ? evType.value() : toSnakeCase(className);
return db.append(stream, evTypeName, Json.toJson(event), expectedVersion);
}
public T get(String stream) {
return get(stream, 0, Integer.MAX_VALUE);
}
public T get(String stream, int startVersionInclusive, int limit) {
var instance = newInstance(aggregateType);
for (var event : db.get(stream, startVersionInclusive, limit)) {
var handler = handlers.get(event.type());
if (handler == null) {
log.warn("No handler found for event type '{}' in aggregate {}, skipping", event.type(), aggregateType.getName());
continue;
}
var eventData = Json.fromJson(event.data(), handler.eventType);
invoke(instance, handler, eventData);
instance.version = event.version();
}
return instance;
}
static class ReflectionMagic {
static Map<String, MappedHandler> mapEventHandlers(Class<?> type) {
var handlers = new HashMap<String, MappedHandler>();
for (var method : type.getDeclaredMethods()) {
var params = method.getParameterTypes();
if (params.length != 1) {
log.info("Method '{}' in class '{}' is not a valid event handler, must have exactly one parameter", method.getName(), type.getName());
continue;
}
Class<?> firstParamType = params[0];
var evType = firstParamType.getAnnotation(DomainEvent.class);
if (evType == null) {
log.info("Method '{}' in class '{}' is not a valid event handler, parameter type '{}' is not annotated with @EventType",
method.getName(), type.getName(), firstParamType.getName());
continue;
}
var evTypeName = evType.value() != null && !evType.value().isBlank() ? evType.value() : toSnakeCase(firstParamType.getSimpleName());
handlers.put(evTypeName, new MappedHandler(evTypeName, method, firstParamType));
log.info("Mapped event type '{}' to handler '{}'", evTypeName, type.getName());
}
return handlers;
}
static String toSnakeCase(String input) {
var result = new StringBuilder();
var chars = input.toCharArray();
for (int i = 0; i < chars.length; i++) {
char c = chars[i];
var upperCase = Character.toUpperCase(c);
if (Character.isUpperCase(c) && i > 0 && (i < chars.length - 1 && !Character.isUpperCase(chars[i + 1]))) {
result.append("_");
}
result.append(upperCase);
}
return result.toString();
}
static void invoke(Object instance, MappedHandler handler, Object eventData) {
try {
handler.method.invoke(instance, eventData);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static <T> T newInstance(Class<T> type) {
try {
return type.getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("Aggregate type must have a default constuctor", e);
}
}
public record MappedHandler(String typeName, Method method, Class<?> eventType) {
}
}
}
-- auto-generated definition
create table events
(
uuid CHAR(36) PRIMARY KEY,
sequence BIGINT AUTO_INCREMENT NOT NULL,
stream_id VARCHAR(500) NOT NULL,
event_type VARCHAR(500) NOT NULL,
version INT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
data BLOB NOT NULL,
metadata BLOB NULL
);
CREATE UNIQUE INDEX events_sequence_uindex
ON events (sequence);
CREATE UNIQUE INDEX events_stream_version_uindex
ON events (stream_id, version);
CREATE INDEX events_stream_index
ON events (stream_id);
CREATE INDEX events_event_type_index
ON events (event_type);
CREATE INDEX events_timestamp_index
ON events (timestamp);
CREATE INDEX events_version_index
ON events (version);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment