Last active
May 5, 2020 12:09
-
-
Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tutorial1; | |
import org.apache.kafka.clients.consumer.*; | |
import org.apache.kafka.common.errors.WakeupException; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.time.Duration; | |
import java.util.Collections; | |
import java.util.Date; | |
import java.util.Properties; | |
import java.util.concurrent.CountDownLatch; | |
public class ConsumerDemo { | |
public static void main(String[] args) { | |
ConsumerDemoWorker consumerDemoWorker = new ConsumerDemoWorker(); | |
new Thread(consumerDemoWorker).start(); | |
Runtime.getRuntime().addShutdownHook(new Thread(new ConsumerDemoCloser(consumerDemoWorker))); | |
} | |
private static class ConsumerDemoWorker implements Runnable { | |
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWorker.class); | |
private CountDownLatch countDownLatch; | |
private Consumer<String, String> consumer; | |
@Override | |
public void run() { | |
countDownLatch = new CountDownLatch(1); | |
final Properties properties = new Properties(); | |
final Duration pollTimeout = Duration.ofMillis(100); | |
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | |
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | |
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application"); | |
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
consumer = new KafkaConsumer<>(properties); | |
consumer.subscribe(Collections.singleton("first_topic")); | |
try { | |
while (true) { | |
final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); | |
for (final ConsumerRecord<String, String> consumerRecord : consumerRecords) { | |
log.info("Getting consumer record key: '" + consumerRecord.key() + "', value: '" + consumerRecord.value() + "', partition: " + consumerRecord.partition() + " and offset: " + consumerRecord.offset() + " at " + new Date(consumerRecord.timestamp())); | |
} | |
} | |
} catch (WakeupException e) { | |
log.info("Consumer poll woke up"); | |
consumer.close(); | |
} finally { | |
countDownLatch.countDown(); | |
} | |
} | |
void shutdown() throws InterruptedException { | |
consumer.wakeup(); | |
countDownLatch.await(); | |
log.info("Consumer closed"); | |
} | |
} | |
private static class ConsumerDemoCloser implements Runnable { | |
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCloser.class); | |
private final ConsumerDemoWorker consumerDemoWorker; | |
ConsumerDemoCloser(final ConsumerDemoWorker consumerDemoWorker) { | |
this.consumerDemoWorker = consumerDemoWorker; | |
} | |
@Override | |
public void run() { | |
try { | |
consumerDemoWorker.shutdown(); | |
} catch (InterruptedException e) { | |
log.error("Error shutting down consumer", e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment