Last active
November 14, 2017 10:17
-
-
Save jba/9a0d3e6aa2561274cf5d428ae0af2c97 to your computer and use it in GitHub Desktop.
Using the low-level pubsub client to handle one message at a time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sub1 receives one message at a time from a subscription. | |
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"math/rand" | |
"time" | |
pubsub "cloud.google.com/go/pubsub/apiv1" | |
pb "google.golang.org/genproto/googleapis/pubsub/v1" | |
) | |
var ( | |
projectID = flag.String("p", "", "project") | |
subscription = flag.String("s", "", "subscription") | |
) | |
const ( | |
keepAliveInterval = 5 * time.Second | |
ackDeadlineExtension = 10 * time.Second | |
) | |
func main() { | |
flag.Parse() | |
if *projectID == "" { | |
log.Fatal("need -p") | |
} | |
if *subscription == "" { | |
log.Fatal("need -s") | |
} | |
ctx := context.Background() | |
subscriber, err := pubsub.NewSubscriberClient(ctx) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer subscriber.Close() | |
subName := pubsub.SubscriberSubscriptionPath(*projectID, *subscription) | |
for { | |
log.Println("waiting for message...") | |
resp, err := subscriber.Pull(ctx, &pb.PullRequest{Subscription: subName, MaxMessages: 1}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if len(resp.ReceivedMessages) != 1 { | |
log.Fatalf("got %d messages, expected 1", len(resp.ReceivedMessages)) | |
} | |
msg := resp.ReceivedMessages[0] | |
donec := make(chan bool, 1) | |
go func() { | |
if err := keepAlive(ctx, subscriber, subName, msg, donec); err != nil { | |
log.Fatal(err) | |
} | |
}() | |
if err := process(msg); err != nil { | |
log.Println(err) | |
donec <- false // nack | |
} else { | |
donec <- true // ack | |
} | |
} | |
} | |
func keepAlive(ctx context.Context, subc *pubsub.SubscriberClient, subName string, msg *pb.ReceivedMessage, donec <-chan bool) error { | |
modAckDeadline := func(s int32) error { | |
return subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ | |
Subscription: subName, | |
AckIds: []string{msg.GetAckId()}, | |
AckDeadlineSeconds: s, | |
}) | |
} | |
for { | |
select { | |
case ack := <-donec: | |
var err error | |
if ack { | |
log.Printf("acknowledging %s", msg.Message.MessageId) | |
err = subc.Acknowledge(ctx, &pb.AcknowledgeRequest{ | |
Subscription: subName, | |
AckIds: []string{msg.GetAckId()}, | |
}) | |
} else { // nack | |
log.Printf("nacking %s", msg.Message.MessageId) | |
err = modAckDeadline(0) | |
} | |
return err | |
case <-time.After(keepAliveInterval): | |
log.Printf("keeping %s alive", msg.Message.MessageId) | |
if err := modAckDeadline(int32(ackDeadlineExtension.Seconds())); err != nil { | |
return err | |
} | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
} | |
// process message. Return nil to acknowledge, or a non-nil error to nack. | |
func process(msg *pb.ReceivedMessage) error { | |
log.Printf("processing %s", msg.Message.MessageId) | |
time.Sleep(time.Duration(rand.Intn(20)) * time.Second) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment