Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Created May 12, 2020 14:29
Show Gist options
  • Save ripienaar/dcbf65a54ca9f33922a1990173177206 to your computer and use it in GitHub Desktop.
Save ripienaar/dcbf65a54ca9f33922a1990173177206 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"crypto/sha1"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/reactivex/rxgo/v2"
"github.com/ripienaar/nconn/helper"
nsink "github.com/ripienaar/nconn/helper/sink/nats"
)
type Config struct {
nsink.SinkConfig
nsink.SourceConfig
Name string `arg:"env:NAME,required"`
TargetStream string `arg:"env:STREAM,required"`
}
func main() {
appcfg := Config{}
appcfg.SinkConfig.Name = appcfg.Name
appcfg.SourceConfig.Name = appcfg.Name
helper.MustParseArgs(&appcfg)
nc, err := helper.New(
helper.WithWorkers(1),
helper.WithRxSourceFunc(nsink.NewRxSource(appcfg.SourceConfig)),
helper.WithRXObservable(func(ctx context.Context, wg *sync.WaitGroup, input chan rxgo.Item, output chan interface{}) {
defer wg.Done()
// WindowWithTime emits as item.V a new observable every duration
window := rxgo.FromChannel(input, rxgo.WithContext(ctx)).
WindowWithTime(rxgo.WithDuration(time.Minute)).Observe()
// so we loop all these emitted windows and distinct each
for wi := range window {
// distinct will sha1 the payload and use that to track dupes within window time
distinct := wi.V.(rxgo.Observable).Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
msg := i.(*nats.Msg)
return sha1.Sum(msg.Data), nil
}).Observe()
// Observer() emits a channel of items, so we just publish all those to the new stream
for item := range distinct {
output <- nsink.Message{
Target: appcfg.TargetStream,
Payload: item.V.(*nats.Msg).Data,
}
}
}
}),
helper.WithSinkFactory(nsink.NewSinkFactory(appcfg.SinkConfig)),
)
if err != nil {
panic(err)
}
err = nc.StartForeground()
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment