Skip to content

Instantly share code, notes, and snippets.

@jameshartig
Created February 1, 2018 23:09
Show Gist options
  • Save jameshartig/527b048e019700da482d79cce39ad111 to your computer and use it in GitHub Desktop.
Save jameshartig/527b048e019700da482d79cce39ad111 to your computer and use it in GitHub Desktop.
Pubsub 15 minute delayed acks
package main
import (
"context"
"time"
"github.com/levenlabs/go-llog"
"cloud.google.com/go/pubsub"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
ctx := context.Background()
project := "admiral-1007"
topic := "test_no_backlog"
subSuffix := "test"
ps, err := pubsub.NewClient(ctx, project)
if err != nil {
llog.Fatal("error creating client", llog.ErrKV(err))
}
subName := topic + "_" + subSuffix
sub, err := ps.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: ps.Topic(topic),
})
if err != nil {
// if subscription already exists, use it
if s, ok := status.FromError(err); ok && s.Code() == codes.AlreadyExists {
sub = ps.Subscription(subName)
} else {
llog.Fatal("error creating subscription", llog.ErrKV(err))
}
}
sub.ReceiveSettings.MaxOutstandingMessages = 1
sub.ReceiveSettings.MaxExtension = 20 * time.Minute
sub.ReceiveSettings.NumGoroutines = 1
subKV := llog.KV{"messageSubscription": sub.ID()}
for {
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
kv := subKV.Set("messageID", msg.ID)
start := time.Now()
llog.Info("Starting message", kv)
time.Sleep(15 * time.Minute)
msg.Ack()
llog.Info("Finished message", kv.Set("duration", time.Since(start).String()))
})
if ctx.Err() == context.Canceled {
return
}
if err != nil {
llog.Warn("error consuming from pubsub", subKV, llog.ErrKV(err))
// sleep some amount of time to prevent stampeding google
time.Sleep(100 * time.Millisecond)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment