Created
February 1, 2018 23:09
-
-
Save jameshartig/527b048e019700da482d79cce39ad111 to your computer and use it in GitHub Desktop.
Pubsub 15 minute delayed acks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"time" | |
"github.com/levenlabs/go-llog" | |
"cloud.google.com/go/pubsub" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
) | |
func main() { | |
ctx := context.Background() | |
project := "admiral-1007" | |
topic := "test_no_backlog" | |
subSuffix := "test" | |
ps, err := pubsub.NewClient(ctx, project) | |
if err != nil { | |
llog.Fatal("error creating client", llog.ErrKV(err)) | |
} | |
subName := topic + "_" + subSuffix | |
sub, err := ps.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{ | |
Topic: ps.Topic(topic), | |
}) | |
if err != nil { | |
// if subscription already exists, use it | |
if s, ok := status.FromError(err); ok && s.Code() == codes.AlreadyExists { | |
sub = ps.Subscription(subName) | |
} else { | |
llog.Fatal("error creating subscription", llog.ErrKV(err)) | |
} | |
} | |
sub.ReceiveSettings.MaxOutstandingMessages = 1 | |
sub.ReceiveSettings.MaxExtension = 20 * time.Minute | |
sub.ReceiveSettings.NumGoroutines = 1 | |
subKV := llog.KV{"messageSubscription": sub.ID()} | |
for { | |
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { | |
kv := subKV.Set("messageID", msg.ID) | |
start := time.Now() | |
llog.Info("Starting message", kv) | |
time.Sleep(15 * time.Minute) | |
msg.Ack() | |
llog.Info("Finished message", kv.Set("duration", time.Since(start).String())) | |
}) | |
if ctx.Err() == context.Canceled { | |
return | |
} | |
if err != nil { | |
llog.Warn("error consuming from pubsub", subKV, llog.ErrKV(err)) | |
// sleep some amount of time to prevent stampeding google | |
time.Sleep(100 * time.Millisecond) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment