Last active
June 11, 2017 10:54
-
-
Save saggie/77d9781e06f84e99915998fbdd9ec7ce to your computer and use it in GitHub Desktop.
A simple Kafka (sender and receiver) sample code in Java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample; | |
import java.util.Arrays; | |
import java.util.Date; | |
import java.util.Properties; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
public class Main { | |
public static void main(String[] args) { | |
new Thread(new InfiniteKafkaSender()).start(); | |
new Thread(new InfiniteKafkaReceiver()).start(); | |
} | |
private static class InfiniteKafkaSender implements Runnable { | |
private KafkaProducer<String, String> producer; | |
public InfiniteKafkaSender() { | |
Properties properties = new Properties() {{ | |
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); | |
}}; | |
this.producer = new KafkaProducer<>(properties, new StringSerializer(), | |
new StringSerializer()); | |
} | |
@Override | |
public void run() { | |
while (true) { | |
String message = new Date().toString(); | |
producer.send(new ProducerRecord<>("my-topic", message)); | |
System.out.println("Sent: " + message); | |
try { | |
Thread.sleep(1000L); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
private static class InfiniteKafkaReceiver implements Runnable { | |
private KafkaConsumer<String, String> consumer; | |
public InfiniteKafkaReceiver() { | |
Properties properties = new Properties() {{ | |
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); | |
put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); | |
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); | |
}}; | |
consumer = new KafkaConsumer<>(properties, | |
new StringDeserializer(), new StringDeserializer()); | |
consumer.subscribe(Arrays.asList("my-topic")); | |
} | |
@Override | |
public void run() { | |
while (true) { | |
ConsumerRecords<String, String> records = consumer.poll(1000L); | |
for (ConsumerRecord<String, String> record : records) { | |
System.out.println("Received: " + record.value()); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-clients</artifactId> | |
<version>0.10.2.1</version> | |
</dependency> | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment