Skip to content

Instantly share code, notes, and snippets.

@jba
Last active November 14, 2017 10:17
Show Gist options
  • Save jba/9a0d3e6aa2561274cf5d428ae0af2c97 to your computer and use it in GitHub Desktop.
Save jba/9a0d3e6aa2561274cf5d428ae0af2c97 to your computer and use it in GitHub Desktop.
Using the low-level pubsub client to handle one message at a time
// sub1 receives one message at a time from a subscription.
package main
import (
"context"
"flag"
"log"
"math/rand"
"time"
pubsub "cloud.google.com/go/pubsub/apiv1"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)
var (
projectID = flag.String("p", "", "project")
subscription = flag.String("s", "", "subscription")
)
const (
keepAliveInterval = 5 * time.Second
ackDeadlineExtension = 10 * time.Second
)
func main() {
flag.Parse()
if *projectID == "" {
log.Fatal("need -p")
}
if *subscription == "" {
log.Fatal("need -s")
}
ctx := context.Background()
subscriber, err := pubsub.NewSubscriberClient(ctx)
if err != nil {
log.Fatal(err)
}
defer subscriber.Close()
subName := pubsub.SubscriberSubscriptionPath(*projectID, *subscription)
for {
log.Println("waiting for message...")
resp, err := subscriber.Pull(ctx, &pb.PullRequest{Subscription: subName, MaxMessages: 1})
if err != nil {
log.Fatal(err)
}
if len(resp.ReceivedMessages) != 1 {
log.Fatalf("got %d messages, expected 1", len(resp.ReceivedMessages))
}
msg := resp.ReceivedMessages[0]
donec := make(chan bool, 1)
go func() {
if err := keepAlive(ctx, subscriber, subName, msg, donec); err != nil {
log.Fatal(err)
}
}()
if err := process(msg); err != nil {
log.Println(err)
donec <- false // nack
} else {
donec <- true // ack
}
}
}
func keepAlive(ctx context.Context, subc *pubsub.SubscriberClient, subName string, msg *pb.ReceivedMessage, donec <-chan bool) error {
modAckDeadline := func(s int32) error {
return subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: subName,
AckIds: []string{msg.GetAckId()},
AckDeadlineSeconds: s,
})
}
for {
select {
case ack := <-donec:
var err error
if ack {
log.Printf("acknowledging %s", msg.Message.MessageId)
err = subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: subName,
AckIds: []string{msg.GetAckId()},
})
} else { // nack
log.Printf("nacking %s", msg.Message.MessageId)
err = modAckDeadline(0)
}
return err
case <-time.After(keepAliveInterval):
log.Printf("keeping %s alive", msg.Message.MessageId)
if err := modAckDeadline(int32(ackDeadlineExtension.Seconds())); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// process message. Return nil to acknowledge, or a non-nil error to nack.
func process(msg *pb.ReceivedMessage) error {
log.Printf("processing %s", msg.Message.MessageId)
time.Sleep(time.Duration(rand.Intn(20)) * time.Second)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment