Skip to content

Instantly share code, notes, and snippets.

@dvcrn
Created January 22, 2023 05:48
Show Gist options
  • Save dvcrn/fbfceeb1cd253cb4e4c81ec3b4d5d70f to your computer and use it in GitHub Desktop.
Save dvcrn/fbfceeb1cd253cb4e4c81ec3b4d5d70f to your computer and use it in GitHub Desktop.
amqp deduper
package main
import (
"context"
"encoding/json"
"time"
"github.com/ReneKroon/ttlcache/v2"
"github.com/dvcrn/amqpdedup/internal/amqp"
"github.com/dvcrn/amqpdedup/internal/domain"
"github.com/dvcrn/amqpdedup/internal/logger"
)
func main() {
ctx := context.Background()
cache := ttlcache.NewCache()
cache.SetTTL(time.Duration(30 * time.Second))
defer cache.Close()
amqpClient, err := amqp.NewClient("amqp-server-addr")
if err != nil {
log.Fatalf("error while creating amqp client: %v\n", err)
}
defer amqpClient.Close()
err = amqpClient.Subscribe(ctx, "dedup-queue", "dedup-consumer", func(ctx context.Context, body []byte) error {
var msg domain.MyMessage
err := json.Unmarshal(body, &msg)
if err != nil {
fmt.Printf("unmarshal failed: %v", err)
return err
}
fmt.Printf("processing ", msg.UUID)
if _, err := cache.Get(msg.UUID); err != ttlcache.ErrNotFound {
fmt.Printf("already processed, skipping.")
return nil
}
if err = cache.Set(msg.UUID, true); err != nil {
fmt.Printf("err writing into cache ", err)
}
err = amqpClient.Publish("deduped-exchange", "routing-key", msg)
if err != nil {
fmt.Printf(err)
}
return nil
})
if err != nil {
fmt.Printf(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment