Created
July 24, 2025 08:23
-
-
Save joshgontijo/d1ea9ad6016033ad2dfa106683b922c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package es.demo.esdemo.repository; | |
| import java.lang.annotation.ElementType; | |
| import java.lang.annotation.Retention; | |
| import java.lang.annotation.RetentionPolicy; | |
| import java.lang.annotation.Target; | |
| @Retention(RetentionPolicy.RUNTIME) | |
| @Target({ElementType.TYPE, ElementType.RECORD_COMPONENT}) | |
| public @interface DomainEvent { | |
| String value() default ""; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package es.demo.esdemo.repository; | |
| import jakarta.persistence.*; | |
| import java.time.OffsetDateTime; | |
| import java.util.Arrays; | |
| import java.util.Objects; | |
| @Entity | |
| @Table(name = "events") | |
| public class EventRecord { | |
| @Id | |
| @GeneratedValue(strategy = GenerationType.UUID) | |
| private String uuid; | |
| @GeneratedValue | |
| private long sequence; | |
| private String streamId; | |
| private long version; | |
| @Column(insertable = false, updatable = false) | |
| private OffsetDateTime timestamp; | |
| @Column(name = "event_type") | |
| private String eventType; | |
| private byte[] data; | |
| private byte[] metadata; | |
| public EventRecord(String uuid, long sequence, String streamId, long version, OffsetDateTime timestamp, String eventType, byte[] data) { | |
| this.uuid = uuid; | |
| this.sequence = sequence; | |
| this.streamId = streamId; | |
| this.version = version; | |
| this.timestamp = timestamp; | |
| this.eventType = eventType; | |
| this.data = data; | |
| } | |
| public EventRecord() { | |
| } | |
| public EventRecord uuid(String uuid) { | |
| this.uuid = uuid; | |
| return this; | |
| } | |
| public EventRecord sequence(long sequence) { | |
| this.sequence = sequence; | |
| return this; | |
| } | |
| public EventRecord streamId(String streamId) { | |
| this.streamId = streamId; | |
| return this; | |
| } | |
| public EventRecord version(long version) { | |
| this.version = version; | |
| return this; | |
| } | |
| public EventRecord timestamp(OffsetDateTime timestamp) { | |
| this.timestamp = timestamp; | |
| return this; | |
| } | |
| public EventRecord data(byte[] data) { | |
| this.data = data; | |
| return this; | |
| } | |
| public EventRecord metadata(byte[] metadata) { | |
| this.metadata = metadata; | |
| return this; | |
| } | |
| public String uuid() { | |
| return uuid; | |
| } | |
| public long sequence() { | |
| return sequence; | |
| } | |
| public String streamId() { | |
| return streamId; | |
| } | |
| public long version() { | |
| return version; | |
| } | |
| public OffsetDateTime timestamp() { | |
| return timestamp; | |
| } | |
| public String eventType() { | |
| return eventType; | |
| } | |
| public byte[] data() { | |
| return data; | |
| } | |
| public byte[] metadata() { | |
| return metadata; | |
| } | |
| @Override | |
| public boolean equals(Object obj) { | |
| if (this == obj) return true; | |
| if (obj == null || getClass() != obj.getClass()) return false; | |
| EventRecord that = (EventRecord) obj; | |
| return sequence == that.sequence && | |
| version == that.version && | |
| Objects.equals(uuid, that.uuid) && | |
| Objects.equals(streamId, that.streamId) && | |
| Objects.equals(timestamp, that.timestamp) && | |
| Objects.equals(eventType, that.eventType) && | |
| Arrays.equals(data, that.data); | |
| } | |
| @Override | |
| public int hashCode() { | |
| int result = Objects.hash(uuid, sequence, streamId, version, timestamp, eventType); | |
| result = 31 * result + Arrays.hashCode(data); | |
| return result; | |
| } | |
| @Override | |
| public String toString() { | |
| return "EventRecord{" + | |
| "uuid='" + uuid + '\'' + | |
| ", sequence=" + sequence + | |
| ", stream='" + streamId + '\'' + | |
| ", version=" + version + | |
| ", timestamp=" + timestamp + | |
| ", type='" + eventType + '\'' + | |
| '}'; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package es.demo.esdemo.repository; | |
| import es.demo.esdemo.repo2.Version; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.dao.DataIntegrityViolationException; | |
| import org.springframework.data.domain.PageRequest; | |
| import org.springframework.data.domain.Sort; | |
| import org.springframework.data.jpa.domain.Specification; | |
| import org.springframework.data.jpa.repository.JpaRepository; | |
| import org.springframework.data.jpa.repository.JpaSpecificationExecutor; | |
| import org.springframework.data.jpa.repository.Query; | |
| import org.springframework.stereotype.Repository; | |
| import org.springframework.transaction.annotation.Transactional; | |
| import java.time.OffsetDateTime; | |
| import java.util.List; | |
| import java.util.Optional; | |
| import java.util.Set; | |
| @Repository | |
| public interface EventRepository extends JpaRepository<EventRecord, Long>, JpaSpecificationExecutor<EventRecord> { | |
| Logger log = LoggerFactory.getLogger(EventRepository.class); | |
| long INITIAL_VERSION = 0; | |
| default long append(EventRecord event, Version expectedVersion) { | |
| long streamVersion = version(event.streamId()).orElse(INITIAL_VERSION); | |
| while (true) { | |
| var expected = switch (expectedVersion) { | |
| case Version.Any any -> streamVersion; | |
| case Version.Expect(long version) -> { | |
| if (streamVersion != version) { | |
| throw new VersionMismatch(event.streamId(), streamVersion, version); | |
| } | |
| yield version; | |
| } | |
| }; | |
| try { | |
| event.version(expected + 1); | |
| this.save(event); | |
| return event.version(); | |
| } catch (DataIntegrityViolationException e) { | |
| // This exception is thrown when the expected version does not match the DB version | |
| if (expectedVersion instanceof Version.Expect(long version)) { | |
| //TODO: cannot possibly know the current version. | |
| throw new VersionMismatch(event.streamId(), -1, version, e); | |
| } | |
| } | |
| } | |
| } | |
| //Returns the current version of the stream, or empty if the stream does not exist | |
| @Query("SELECT MAX(e.version) FROM EventRecord e WHERE e.streamId = ?1") | |
| Optional<Long> version(String stream); | |
| @Transactional(readOnly = true) | |
| default List<EventRecord> query(Specification<EventRecord> query, Sort sort, int limit) { | |
| return this.findAll(query, PageRequest.of(0, limit, sort)) | |
| .toList(); | |
| } | |
| class EventQuery { | |
| public static Specification<EventRecord> eventTypes(Set<String> eventTypes) { | |
| return (root, query, criteriaBuilder) -> | |
| root.get("event_type").in(eventTypes); | |
| } | |
| public static Specification<EventRecord> timestampBetween(OffsetDateTime start, OffsetDateTime end) { | |
| return (root, query, criteriaBuilder) -> | |
| criteriaBuilder.between(root.get("timestamp"), start, end); | |
| } | |
| public static Specification<EventRecord> version(long from) { | |
| return version(from, Long.MAX_VALUE); | |
| } | |
| public static Specification<EventRecord> version(long from, long to) { | |
| return (root, query, criteriaBuilder) -> | |
| criteriaBuilder.between(root.get("version"), from, to); | |
| } | |
| public static Specification<EventRecord> sequence(long from) { | |
| return (root, query, criteriaBuilder) -> | |
| criteriaBuilder.ge(root.get("sequence"), from); | |
| } | |
| public static Specification<EventRecord> sequence(long from, long to) { | |
| return (root, query, criteriaBuilder) -> | |
| criteriaBuilder.between(root.get("sequence"), from, to); | |
| } | |
| public static Specification<EventRecord> stream(String streamId) { | |
| return (root, query, criteriaBuilder) -> | |
| criteriaBuilder.equal(root.get("stream_id"), streamId); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package es.demo.esdemo.repository; | |
| import es.demo.esdemo.Json; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import java.lang.reflect.Method; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.MappedHandler; | |
| import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.invoke; | |
| import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.mapEventHandlers; | |
| import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.newInstance; | |
| import static es.demo.esdemo.repository.TypedRepo.ReflectionMagic.toSnakeCase; | |
| /** | |
| * Typed repository for aggregates, allowing to handle events with specific methods in the aggregate class. | |
| * The aggregate class must have a default constructor and methods annotated with @EventType. | |
| * | |
| * @param <T> the type of the aggregate | |
| */ | |
| public class TypedRepo<T extends Aggregate> { | |
| private static final Logger log = LoggerFactory.getLogger(TypedRepo.class); | |
| private final EventRepository db; | |
| private final Map<String, MappedHandler> handlers; | |
| private final Class<T> aggregateType; | |
| public TypedRepo(EventRepository db, Class<T> aggregateType) { | |
| this.db = db; | |
| this.handlers = mapEventHandlers(aggregateType); | |
| this.aggregateType = aggregateType; | |
| } | |
| public WriteResult append(String stream, Object event, int expectedVersion) { | |
| var className = event.getClass().getSimpleName(); | |
| var evType = event.getClass().getAnnotation(DomainEvent.class); | |
| if (evType == null) { | |
| throw new IllegalArgumentException("Event class " + className + " is not annotated with " + DomainEvent.class.getSimpleName()); | |
| } | |
| var evTypeName = evType.value() != null && !evType.value().isBlank() ? evType.value() : toSnakeCase(className); | |
| return db.append(stream, evTypeName, Json.toJson(event), expectedVersion); | |
| } | |
| public T get(String stream) { | |
| return get(stream, 0, Integer.MAX_VALUE); | |
| } | |
| public T get(String stream, int startVersionInclusive, int limit) { | |
| var instance = newInstance(aggregateType); | |
| for (var event : db.get(stream, startVersionInclusive, limit)) { | |
| var handler = handlers.get(event.type()); | |
| if (handler == null) { | |
| log.warn("No handler found for event type '{}' in aggregate {}, skipping", event.type(), aggregateType.getName()); | |
| continue; | |
| } | |
| var eventData = Json.fromJson(event.data(), handler.eventType); | |
| invoke(instance, handler, eventData); | |
| instance.version = event.version(); | |
| } | |
| return instance; | |
| } | |
| static class ReflectionMagic { | |
| static Map<String, MappedHandler> mapEventHandlers(Class<?> type) { | |
| var handlers = new HashMap<String, MappedHandler>(); | |
| for (var method : type.getDeclaredMethods()) { | |
| var params = method.getParameterTypes(); | |
| if (params.length != 1) { | |
| log.info("Method '{}' in class '{}' is not a valid event handler, must have exactly one parameter", method.getName(), type.getName()); | |
| continue; | |
| } | |
| Class<?> firstParamType = params[0]; | |
| var evType = firstParamType.getAnnotation(DomainEvent.class); | |
| if (evType == null) { | |
| log.info("Method '{}' in class '{}' is not a valid event handler, parameter type '{}' is not annotated with @EventType", | |
| method.getName(), type.getName(), firstParamType.getName()); | |
| continue; | |
| } | |
| var evTypeName = evType.value() != null && !evType.value().isBlank() ? evType.value() : toSnakeCase(firstParamType.getSimpleName()); | |
| handlers.put(evTypeName, new MappedHandler(evTypeName, method, firstParamType)); | |
| log.info("Mapped event type '{}' to handler '{}'", evTypeName, type.getName()); | |
| } | |
| return handlers; | |
| } | |
| static String toSnakeCase(String input) { | |
| var result = new StringBuilder(); | |
| var chars = input.toCharArray(); | |
| for (int i = 0; i < chars.length; i++) { | |
| char c = chars[i]; | |
| var upperCase = Character.toUpperCase(c); | |
| if (Character.isUpperCase(c) && i > 0 && (i < chars.length - 1 && !Character.isUpperCase(chars[i + 1]))) { | |
| result.append("_"); | |
| } | |
| result.append(upperCase); | |
| } | |
| return result.toString(); | |
| } | |
| static void invoke(Object instance, MappedHandler handler, Object eventData) { | |
| try { | |
| handler.method.invoke(instance, eventData); | |
| } catch (Exception e) { | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| static <T> T newInstance(Class<T> type) { | |
| try { | |
| return type.getConstructor().newInstance(); | |
| } catch (Exception e) { | |
| throw new RuntimeException("Aggregate type must have a default constuctor", e); | |
| } | |
| } | |
| public record MappedHandler(String typeName, Method method, Class<?> eventType) { | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- auto-generated definition | |
| create table events | |
| ( | |
| uuid CHAR(36) PRIMARY KEY, | |
| sequence BIGINT AUTO_INCREMENT NOT NULL, | |
| stream_id VARCHAR(500) NOT NULL, | |
| event_type VARCHAR(500) NOT NULL, | |
| version INT NOT NULL, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, | |
| data BLOB NOT NULL, | |
| metadata BLOB NULL | |
| ); | |
| CREATE UNIQUE INDEX events_sequence_uindex | |
| ON events (sequence); | |
| CREATE UNIQUE INDEX events_stream_version_uindex | |
| ON events (stream_id, version); | |
| CREATE INDEX events_stream_index | |
| ON events (stream_id); | |
| CREATE INDEX events_event_type_index | |
| ON events (event_type); | |
| CREATE INDEX events_timestamp_index | |
| ON events (timestamp); | |
| CREATE INDEX events_version_index | |
| ON events (version); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment