Last active
November 19, 2018 11:34
-
-
Save gliviu/c1bd919aacc951ca73c1fe7f058d785a to your computer and use it in GitHub Desktop.
Consume and print message from all topics in a kafka server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| plugins { | |
| id 'java' | |
| id 'application' | |
| id 'eclipse' | |
| } | |
| repositories { | |
| jcenter() | |
| } | |
| dependencies { | |
| compile 'ch.qos.logback:logback-classic:1.2.1' | |
| compile 'org.apache.kafka:kafka-clients:2.0.0' | |
| } | |
| mainClassName = 'KafkaMultiConsumer' | |
| task execute(type:JavaExec) { | |
| main = mainClassName | |
| classpath = sourceSets.main.runtimeClasspath | |
| } | |
| jar { | |
| manifest { | |
| attributes "Main-Class": mainClassName | |
| } | |
| from { | |
| configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.time.Duration; | |
| import java.time.Instant; | |
| import java.time.LocalTime; | |
| import java.util.*; | |
| import java.util.concurrent.ExecutionException; | |
| import java.util.concurrent.atomic.AtomicBoolean; | |
| import java.util.regex.Matcher; | |
| import java.util.regex.Pattern; | |
| import java.util.stream.Collectors; | |
| import org.apache.kafka.clients.admin.AdminClient; | |
| import org.apache.kafka.clients.admin.ListTopicsResult; | |
| import org.apache.kafka.clients.consumer.ConsumerRecord; | |
| import org.apache.kafka.clients.consumer.ConsumerRecords; | |
| import org.apache.kafka.clients.consumer.KafkaConsumer; | |
| public class MultiConsumer { | |
| private static AtomicBoolean keepRunning = new AtomicBoolean(true); | |
| private static class Options { | |
| public Set<String> topics = Collections.emptySet(); | |
| public Set<String> excludedTopics = Collections.emptySet(); | |
| public String bootstrapServer; | |
| } | |
| public static void main(String[] args) throws ExecutionException, InterruptedException { | |
| if(args.length==0){ | |
| System.out.println("Usage:"); | |
| System.out.println("java -jar multi-consumer.jar [bs:kafka-host:9092] [topics:topic1,topic2,...] [exclude:topic1,topic2,..."); | |
| System.out.println("Without arguments it will consume from all topics on localhost:9092."); | |
| return; | |
| } | |
| Options options; | |
| try { | |
| options = parseOptions(args); | |
| } catch(IllegalArgumentException e) { | |
| System.out.println(e.getMessage()); | |
| return; | |
| } | |
| final Thread mainThread = Thread.currentThread(); | |
| Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
| keepRunning.set(false); | |
| try { | |
| mainThread.join(); | |
| } catch (InterruptedException e) { | |
| // ignore | |
| } | |
| })); | |
| String bootstrapServers = options.bootstrapServer==null?"localhost:9092":options.bootstrapServer; | |
| Set<String> topics = options.topics; | |
| if(topics.isEmpty()){ | |
| Instant i1 = Instant.now(); | |
| System.out.println(String.format("No topics specified. Retrieving all topics from %s...", bootstrapServers)); | |
| topics = allTopics(bootstrapServers); | |
| Instant i2 = Instant.now(); | |
| System.out.println(String.format("%d topics in %s ms", topics.size(), (i2.toEpochMilli()-i1.toEpochMilli()))); | |
| System.out.println(); | |
| } | |
| Set<String> exclude = options.excludedTopics; | |
| topics = topics.stream().filter(topic -> !exclude.contains(topic)).collect(Collectors.toSet()); | |
| System.out.println(String.format("Server: %s", bootstrapServers)); | |
| System.out.println(String.format("Consuming topics: %s", String.join(",", topics))); | |
| Properties props = new Properties(); | |
| props.put("bootstrap.servers", bootstrapServers); | |
| props.put("group.id", "multi-consumer"); | |
| props.put("enable.auto.commit", "true"); | |
| props.put("auto.commit.interval.ms", "1000"); | |
| props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
| props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
| try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)){ | |
| consumer.subscribe(topics); | |
| while (keepRunning.get()) { | |
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); | |
| for (ConsumerRecord<String, String> record : records){ | |
| System.out.printf("### %s ### - %s - offset: %d, key: %s%n", record.topic(), LocalTime.now(), record.offset(), record.key()); | |
| System.out.printf("%s%n", record.value()); | |
| } | |
| } | |
| } | |
| System.out.println("done"); | |
| } | |
| private static Options parseOptions(String[] args) { | |
| Pattern optionParser = Pattern.compile("(.+?):(.*)"); | |
| Options options = new Options(); | |
| for(String option : args){ | |
| Matcher matcher = optionParser.matcher(option); | |
| if(matcher.matches()){ | |
| String optionKey = matcher.group(1); | |
| String optionVal = matcher.group(2); | |
| switch(optionKey){ | |
| case "topics": | |
| options.topics = new HashSet<>(Arrays.asList(optionVal.split(","))); | |
| break; | |
| case "bs": | |
| options.bootstrapServer = optionVal; | |
| break; | |
| case "exclude": | |
| options.excludedTopics = new HashSet<>(Arrays.asList(optionVal.split(","))); | |
| break; | |
| default: | |
| throw new IllegalArgumentException(String.format("Unknown option '%s'.", option)); | |
| } | |
| } else { | |
| throw new IllegalArgumentException(String.format("Wrong option format '%s'. Use 'option:value'.", option)); | |
| } | |
| } | |
| return options; | |
| } | |
| private static Set<String> allTopics(String bootstrapServers) throws ExecutionException, InterruptedException { | |
| Properties props = new Properties(); | |
| props.setProperty("bootstrap.servers", bootstrapServers); | |
| AdminClient ac = AdminClient.create(props); | |
| ListTopicsResult topics = ac.listTopics(); | |
| return topics.names().get(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment