Skip to content

Instantly share code, notes, and snippets.

@StarpTech
Last active January 5, 2020 11:32
Show Gist options
  • Save StarpTech/04dbf1018be8e2ec60c01a8e6c4e1598 to your computer and use it in GitHub Desktop.
Save StarpTech/04dbf1018be8e2ec60c01a8e6c4e1598 to your computer and use it in GitHub Desktop.
Tracker to store last processed event id
sub, err := sc.Subscribe("eventstore.events", func(m *stan.Msg) {
// check if event was already processed
if p.tracker.IsDuplicateOperation(p.name, event.ID) {
log.WithFields(eventLogFields).WithFields(subLogFields).Trace("duplicate")
if err := m.Ack(); err != nil {
log.WithFields(eventLogFields).WithFields(subLogFields).WithError(err).Error("ack duplicate event")
}
return
}
// your code
// track event as processed
if err := r.tracker.ProcessingEvent(r.name, event.ID); err != nil {
log.WithFields(eventLogFields).WithFields(r.baseLogFields).WithError(err).Error("track event as processed")
}
// ack message
if err := m.Ack(); err != nil {
log.WithFields(eventLogFields).WithFields(r.baseLogFields).WithError(err).Error("ack event")
}
},
stan.maxInFlight(1),
stan.AckWait("30s"),
stan.SetManualAckMode(),
stan.DurableName(durableName),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment