Skip to content

Instantly share code, notes, and snippets.

@sanderploegsma
Created September 14, 2017 13:04
Show Gist options
  • Save sanderploegsma/6f8262d673176b3387f66e5d71a35c37 to your computer and use it in GitHub Desktop.
Save sanderploegsma/6f8262d673176b3387f66e5d71a35c37 to your computer and use it in GitHub Desktop.
// waitUntilQueueEmpty subscribes on the queue and returns when no messages arrive for more than 5 seconds
// all received messages are not acknowledged in order to return them to the queue so that workers can process them
func (m *master) waitUntilQueueEmpty(ctx context.Context, sub *pubsub.Subscription) error {
// Use cancellable context for the subscription
ctx, cancelSubscribe := context.WithCancel(ctx)
messageReceived := make(chan bool)
go func() {
// keep checking if we received a message in the last 5 seconds
// if not, cancel the subscription
for {
select {
case <-messageReceived:
continue
case <-time.After(5 * time.Second):
cancelSubscribe()
break
}
}
}()
log.Println("Waiting for queue to become empty")
return sub.Receive(ctx, func(ctx context2.Context, msg *pubsub.Message) {
messageReceived <- true
msg.Nack()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment