Created
May 12, 2020 14:29
-
-
Save ripienaar/dcbf65a54ca9f33922a1990173177206 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/sha1" | |
"sync" | |
"time" | |
"github.com/nats-io/nats.go" | |
"github.com/reactivex/rxgo/v2" | |
"github.com/ripienaar/nconn/helper" | |
nsink "github.com/ripienaar/nconn/helper/sink/nats" | |
) | |
type Config struct { | |
nsink.SinkConfig | |
nsink.SourceConfig | |
Name string `arg:"env:NAME,required"` | |
TargetStream string `arg:"env:STREAM,required"` | |
} | |
func main() { | |
appcfg := Config{} | |
appcfg.SinkConfig.Name = appcfg.Name | |
appcfg.SourceConfig.Name = appcfg.Name | |
helper.MustParseArgs(&appcfg) | |
nc, err := helper.New( | |
helper.WithWorkers(1), | |
helper.WithRxSourceFunc(nsink.NewRxSource(appcfg.SourceConfig)), | |
helper.WithRXObservable(func(ctx context.Context, wg *sync.WaitGroup, input chan rxgo.Item, output chan interface{}) { | |
defer wg.Done() | |
// WindowWithTime emits as item.V a new observable every duration | |
window := rxgo.FromChannel(input, rxgo.WithContext(ctx)). | |
WindowWithTime(rxgo.WithDuration(time.Minute)).Observe() | |
// so we loop all these emitted windows and distinct each | |
for wi := range window { | |
// distinct will sha1 the payload and use that to track dupes within window time | |
distinct := wi.V.(rxgo.Observable).Distinct(func(_ context.Context, i interface{}) (interface{}, error) { | |
msg := i.(*nats.Msg) | |
return sha1.Sum(msg.Data), nil | |
}).Observe() | |
// Observer() emits a channel of items, so we just publish all those to the new stream | |
for item := range distinct { | |
output <- nsink.Message{ | |
Target: appcfg.TargetStream, | |
Payload: item.V.(*nats.Msg).Data, | |
} | |
} | |
} | |
}), | |
helper.WithSinkFactory(nsink.NewSinkFactory(appcfg.SinkConfig)), | |
) | |
if err != nil { | |
panic(err) | |
} | |
err = nc.StartForeground() | |
if err != nil { | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment