Created
April 7, 2019 15:34
-
-
Save jesty/548f20abf4670abb807e6c0cd4a04899 to your computer and use it in GitHub Desktop.
This class will be useful when you need to stop Kafka consuming when you are using Spring Cloud Stream with Kafka. I use this class to stop consuming during some management operation on my microservice.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.stream.binder.Binding; | |
import org.springframework.cloud.stream.binder.BindingCreatedEvent; | |
import org.springframework.context.event.EventListener; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.stereotype.Component; | |
@Component | |
public class SpringKafkaStartStop { | |
private final Logger log = LoggerFactory.getLogger(SpringKafkaStartStop.class); | |
private final Map<String, Binding<MessageChannel>> bindingChannels = new ConcurrentHashMap<>(); | |
@EventListener(BindingCreatedEvent.class) | |
public void listenBinding(BindingCreatedEvent event) { | |
//collect all binders in order to stop later | |
Binding<MessageChannel> binding = (Binding<MessageChannel>) event.getSource(); | |
log.info("Collecting binding event: {}", binding); | |
bindingChannels.put(binding.getName(), binding); | |
} | |
public void start() { | |
try { | |
bindingChannels.entrySet().forEach(entry -> { | |
log.info("Binding {} {}", entry.getKey(), entry.getValue()); | |
entry.getValue().start(); | |
}); | |
} catch (BeansException e) { | |
throw new IllegalStateException("Cannot perform binding, no proper implementation found", e); | |
} | |
} | |
public void stop() { | |
try { | |
bindingChannels.entrySet().forEach(entry -> { | |
log.info("Unbinding {}", entry.getKey()); | |
entry.getValue().stop(); | |
}); | |
} catch (BeansException e) { | |
throw new IllegalStateException("Cannot perform unbinding, no proper implementation found", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment