Skip to content

Instantly share code, notes, and snippets.

@schroedermatt
Created April 4, 2022 20:37
Show Gist options
  • Save schroedermatt/49c9bd80280b1a5f45c4f06b676bb161 to your computer and use it in GitHub Desktop.
Save schroedermatt/49c9bd80280b1a5f45c4f06b676bb161 to your computer and use it in GitHub Desktop.
// assume consumer is subscribed to demo.topic.name && another.topic.name
String pausedTopic = "demo.topic.name";
//
// PAUSE
//
// filter out the assigned partitions for the topic being paused
Set<TopicPartition> partitionsToPause = consumer.assignment()
.stream()
.filter(tp -> pausedTopic.equals(tp.topic()))
.collect(toSet());
Set<TopicPartition> pausedPartitions = consumer.pause(partitionsToPause);
//
// RESUME
//
// filter out the paused partitions for the topic being resumed
Set<TopicPartition> partitionsToResume = consumer.paused()
.stream()
.filter(tp -> pausedTopic.equals(tp.topic()))
.collect(toSet());
consumer.resume(partitionsToResume);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment