Skip to content

Instantly share code, notes, and snippets.

@sanderploegsma
Created September 14, 2017 13:04
Show Gist options
  • Save sanderploegsma/eec1b9759923fd460e395b9fbf21db10 to your computer and use it in GitHub Desktop.
Save sanderploegsma/eec1b9759923fd460e395b9fbf21db10 to your computer and use it in GitHub Desktop.
// waitUntilQueueEmpty subscribes on the queue and returns when no messages arrive for more than 5 seconds
// all received messages are not acknowledged in order to return them to the queue so that workers can process them
func (m *master) waitUntilQueueEmpty(ctx context.Context, sub *pubsub.Subscription) error {
var mu sync.Mutex
lastMessageTimestamp := time.Now()
// Use cancellable context for the subscription
ctx, cancelSubscribe := context.WithCancel(ctx)
// Check if the last message was received more than 5 seconds ago. If so, cancel the subscription context
go func() {
for {
if lastMessageTimestamp.Add(5 * time.Second).Before(time.Now()) {
cancelSubscribe()
break
}
time.Sleep(1 * time.Second)
}
}()
log.Println("Waiting for queue to become empty")
return sub.Receive(ctx, func(ctx context2.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
lastMessageTimestamp = time.Now()
msg.Nack()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment