Created
September 14, 2017 13:04
-
-
Save sanderploegsma/eec1b9759923fd460e395b9fbf21db10 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// waitUntilQueueEmpty subscribes on the queue and returns when no messages arrive for more than 5 seconds | |
// all received messages are not acknowledged in order to return them to the queue so that workers can process them | |
func (m *master) waitUntilQueueEmpty(ctx context.Context, sub *pubsub.Subscription) error { | |
var mu sync.Mutex | |
lastMessageTimestamp := time.Now() | |
// Use cancellable context for the subscription | |
ctx, cancelSubscribe := context.WithCancel(ctx) | |
// Check if the last message was received more than 5 seconds ago. If so, cancel the subscription context | |
go func() { | |
for { | |
if lastMessageTimestamp.Add(5 * time.Second).Before(time.Now()) { | |
cancelSubscribe() | |
break | |
} | |
time.Sleep(1 * time.Second) | |
} | |
}() | |
log.Println("Waiting for queue to become empty") | |
return sub.Receive(ctx, func(ctx context2.Context, msg *pubsub.Message) { | |
mu.Lock() | |
defer mu.Unlock() | |
lastMessageTimestamp = time.Now() | |
msg.Nack() | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment