Skip to content

Instantly share code, notes, and snippets.

@blinkinglight
Last active March 16, 2025 13:27
Show Gist options
  • Save blinkinglight/3ee4dd2138fa3babf5f6da6542bb3a41 to your computer and use it in GitHub Desktop.
Save blinkinglight/3ee4dd2138fa3babf5f6da6542bb3a41 to your computer and use it in GitHub Desktop.
something with events
package main
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/ituoga/the-system/pkg/tools"
"github.com/nats-io/nats.go"
)
func main() {
ctx := context.Background()
_ = ctx
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
panic(err)
}
defer nc.Drain()
defer nc.Close()
id := "z23ntlMT7yPIvFy4FGQ7or"
js, _ := nc.JetStream()
_ = js
js.PurgeStream("users")
js.Publish(fmt.Sprintf("users.%s.created", id), []byte(`{"name":"John", "lastname":"Doe", "created_at":"2021-09-01"}`))
js.Publish(fmt.Sprintf("users.%s.address", id), []byte(`{"address":"123 Main St", "created_at":"2021-10-01"}`))
js.Publish(fmt.Sprintf("users.%s.addressv2", id), []byte(`{"address":"v2 address", "created_at":"2021-11-01"}`))
js.Publish(fmt.Sprintf("users.%s.somethingnotimplementedyet", id), []byte(`{"other":"not implemented yet", "created_at":"2021-12-01"}`))
js.Publish(fmt.Sprintf("users.%s.addressv3", id), []byte(`{"address":"v3 address", "city":"v3 city", "country":"v3 country", "created_at":"2021-12-01"}`))
state := replay(ctx, nc, "users", id, func(ctx context.Context, id string, msgs <-chan *nats.Msg) (state State) {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-msgs:
if !ok {
return
}
switch getEvent(msg.Subject) {
case "created":
user, _ := tools.Unmarshal[UserCreated](msg.Data)
state.Name = user.Name
state.Lastname = user.Lastname
state.Changes = append(state.Changes, "created at "+user.CreatedAt)
case "address":
address, _ := tools.Unmarshal[AddressUpdated](msg.Data)
state.Address = address.Address
state.Changes = append(state.Changes, "address updated at"+address.CreatedAt)
case "addressv2":
address, _ := tools.Unmarshal[AddressUpdated](msg.Data)
state.Address = address.Address
state.Changes = append(state.Changes, "address updated at"+address.CreatedAt)
case "addressv3":
address, _ := tools.Unmarshal[AddressUpdatedV3](msg.Data)
state.Address = address.Address + ", " + address.City + ", " + address.Country
state.Changes = append(state.Changes, "address updated at"+address.CreatedAt)
default:
log.Printf("Unknown event: %s with payload %s", getEvent(msg.Subject), msg.Data)
}
}
}
})
log.Printf("Final state %+v", state)
}
type onReqFn[T any] func(ctx context.Context, id string, msgs <-chan *nats.Msg) T
func replay[T any](ctx context.Context, nc *nats.Conn, domain, id string, fn onReqFn[T]) T {
js, _ := nc.JetStream()
lctx, lcfn := context.WithCancel(ctx)
msgs := make(chan *nats.Msg, 128)
messages := make(chan *nats.Msg, 128)
sub, _ := js.ChanSubscribe(fmt.Sprintf("%s.%s.>", domain, id), msgs, nats.AckExplicit(), nats.DeliverAll())
defer close(msgs)
defer sub.Unsubscribe()
delay := 100 * time.Millisecond
go func() {
waiter := time.NewTimer(delay)
for {
select {
case <-ctx.Done():
lcfn()
return
case <-waiter.C:
lcfn()
return
case msg := <-msgs:
waiter.Reset(delay)
messages <- msg
msg.Ack()
}
}
}()
return fn(lctx, id, messages)
}
func getEvent(subject string) string {
parts := strings.SplitN(subject, ".", 3)
return parts[len(parts)-1]
}
type State struct {
ID string
Name string
Lastname string
Address string
Changes []string
}
type UserCreated struct {
Name string `json:"name"`
Lastname string `json:"lastname"`
CreatedAt string `json:"created_at"`
}
type AddressUpdated struct {
Address string `json:"address"`
CreatedAt string `json:"created_at"`
}
type AddressUpdatedV3 struct {
*AddressUpdated
City string `json:"city"`
Country string `json:"country"`
CreatedAt string `json:"created_at"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment