Skip to content

Instantly share code, notes, and snippets.

@thomaspoignant
Created February 19, 2025 12:41
Show Gist options
  • Save thomaspoignant/07d49ea30eda61ddf5b848d884586ef5 to your computer and use it in GitHub Desktop.
Save thomaspoignant/07d49ea30eda61ddf5b848d884586ef5 to your computer and use it in GitHub Desktop.
package datastruct
import (
"fmt"
"sync"
)
const minOffset = int64(-1)
type eventStoreImpl[T any] struct {
Events []event[T]
Mutex sync.Mutex
Consumers map[string]consumer
}
func NewEventStore[T any](consumerNames []string) EventStore[T] {
consumers := make(map[string]consumer)
for _, name := range consumerNames {
consumers[name] = consumer{Offset: 0}
}
return &eventStoreImpl[T]{
Events: make([]event[T], 0),
Consumers: consumers,
Mutex: sync.Mutex{},
}
}
type EventStore[T any] interface {
Push(data T)
GetEvents(consumerName string) []event[T]
GetConsumer(name string) (consumer, error)
}
type event[T any] struct {
Offset int64
Data T
}
type consumer struct {
Offset int64
}
func (e *eventStoreImpl[T]) GetConsumer(name string) (consumer, error) {
if e.Consumers == nil {
return consumer{}, fmt.Errorf("consumer not found")
}
if _, ok := e.Consumers[name]; !ok {
return consumer{}, fmt.Errorf("consumer not found")
}
return e.Consumers[name], nil
}
func (e *eventStoreImpl[T]) Push(data T) {
e.Mutex.Lock()
defer e.Mutex.Unlock()
currentOffset := minOffset
if e.Events != nil && len(e.Events) > 0 {
currentOffset = e.Events[len(e.Events)-1].Offset
}
e.Events = append(e.Events, event[T]{Offset: currentOffset + 1, Data: data})
}
func (e *eventStoreImpl[T]) GetEvents(consumerName string) []event[T] {
e.Mutex.Lock()
defer e.Mutex.Unlock()
events := make([]event[T], 0)
currentConsumer, err := e.GetConsumer(consumerName)
if err != nil {
// TODO: handle error
return events
}
for _, event := range e.Events {
if event.Offset > currentConsumer.Offset {
events = append(events, event)
}
}
if len(events) > 0 {
e.Consumers[consumerName] = consumer{Offset: events[len(events)-1].Offset}
}
if err := e.RemoveOldEvents(); err != nil {
// TODO: log something here
}
return events
}
func (e *eventStoreImpl[T]) RemoveOldEvents() error {
if e.Events == nil || len(e.Events) == 0 {
return nil
}
if e.Consumers == nil || len(e.Consumers) == 0 {
return fmt.Errorf("no consumers configured")
}
consumerMinOffset := minOffset
for _, consumer := range e.Consumers {
if consumerMinOffset == minOffset || consumer.Offset < consumerMinOffset {
consumerMinOffset = consumer.Offset
}
}
if consumerMinOffset == int64(-1) {
//TODO: no events to remove
return nil
}
index := 0
for i, event := range e.Events {
if event.Offset == consumerMinOffset {
index = i
break
}
}
e.Events = e.Events[index:]
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment