Skip to content

Instantly share code, notes, and snippets.

@gliviu
Last active November 19, 2018 11:34
Show Gist options
  • Select an option

  • Save gliviu/c1bd919aacc951ca73c1fe7f058d785a to your computer and use it in GitHub Desktop.

Select an option

Save gliviu/c1bd919aacc951ca73c1fe7f058d785a to your computer and use it in GitHub Desktop.
Consume and print message from all topics in a kafka server
plugins {
id 'java'
id 'application'
id 'eclipse'
}
repositories {
jcenter()
}
dependencies {
compile 'ch.qos.logback:logback-classic:1.2.1'
compile 'org.apache.kafka:kafka-clients:2.0.0'
}
mainClassName = 'KafkaMultiConsumer'
task execute(type:JavaExec) {
main = mainClassName
classpath = sourceSets.main.runtimeClasspath
}
jar {
manifest {
attributes "Main-Class": mainClassName
}
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MultiConsumer {
private static AtomicBoolean keepRunning = new AtomicBoolean(true);
private static class Options {
public Set<String> topics = Collections.emptySet();
public Set<String> excludedTopics = Collections.emptySet();
public String bootstrapServer;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
if(args.length==0){
System.out.println("Usage:");
System.out.println("java -jar multi-consumer.jar [bs:kafka-host:9092] [topics:topic1,topic2,...] [exclude:topic1,topic2,...");
System.out.println("Without arguments it will consume from all topics on localhost:9092.");
return;
}
Options options;
try {
options = parseOptions(args);
} catch(IllegalArgumentException e) {
System.out.println(e.getMessage());
return;
}
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
keepRunning.set(false);
try {
mainThread.join();
} catch (InterruptedException e) {
// ignore
}
}));
String bootstrapServers = options.bootstrapServer==null?"localhost:9092":options.bootstrapServer;
Set<String> topics = options.topics;
if(topics.isEmpty()){
Instant i1 = Instant.now();
System.out.println(String.format("No topics specified. Retrieving all topics from %s...", bootstrapServers));
topics = allTopics(bootstrapServers);
Instant i2 = Instant.now();
System.out.println(String.format("%d topics in %s ms", topics.size(), (i2.toEpochMilli()-i1.toEpochMilli())));
System.out.println();
}
Set<String> exclude = options.excludedTopics;
topics = topics.stream().filter(topic -> !exclude.contains(topic)).collect(Collectors.toSet());
System.out.println(String.format("Server: %s", bootstrapServers));
System.out.println(String.format("Consuming topics: %s", String.join(",", topics)));
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "multi-consumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)){
consumer.subscribe(topics);
while (keepRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records){
System.out.printf("### %s ### - %s - offset: %d, key: %s%n", record.topic(), LocalTime.now(), record.offset(), record.key());
System.out.printf("%s%n", record.value());
}
}
}
System.out.println("done");
}
private static Options parseOptions(String[] args) {
Pattern optionParser = Pattern.compile("(.+?):(.*)");
Options options = new Options();
for(String option : args){
Matcher matcher = optionParser.matcher(option);
if(matcher.matches()){
String optionKey = matcher.group(1);
String optionVal = matcher.group(2);
switch(optionKey){
case "topics":
options.topics = new HashSet<>(Arrays.asList(optionVal.split(",")));
break;
case "bs":
options.bootstrapServer = optionVal;
break;
case "exclude":
options.excludedTopics = new HashSet<>(Arrays.asList(optionVal.split(",")));
break;
default:
throw new IllegalArgumentException(String.format("Unknown option '%s'.", option));
}
} else {
throw new IllegalArgumentException(String.format("Wrong option format '%s'. Use 'option:value'.", option));
}
}
return options;
}
private static Set<String> allTopics(String bootstrapServers) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
AdminClient ac = AdminClient.create(props);
ListTopicsResult topics = ac.listTopics();
return topics.names().get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment