Created
September 14, 2017 13:04
-
-
Save sanderploegsma/6f8262d673176b3387f66e5d71a35c37 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// waitUntilQueueEmpty subscribes on the queue and returns when no messages arrive for more than 5 seconds | |
// all received messages are not acknowledged in order to return them to the queue so that workers can process them | |
func (m *master) waitUntilQueueEmpty(ctx context.Context, sub *pubsub.Subscription) error { | |
// Use cancellable context for the subscription | |
ctx, cancelSubscribe := context.WithCancel(ctx) | |
messageReceived := make(chan bool) | |
go func() { | |
// keep checking if we received a message in the last 5 seconds | |
// if not, cancel the subscription | |
for { | |
select { | |
case <-messageReceived: | |
continue | |
case <-time.After(5 * time.Second): | |
cancelSubscribe() | |
break | |
} | |
} | |
}() | |
log.Println("Waiting for queue to become empty") | |
return sub.Receive(ctx, func(ctx context2.Context, msg *pubsub.Message) { | |
messageReceived <- true | |
msg.Nack() | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment