Last active
June 27, 2021 14:01
-
-
Save aveuiller/360a8083aec489116122e1a09054bd93 to your computer and use it in GitHub Desktop.
medium_kafka_apprentice_cookbook
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Main { | |
public static void main(String[] args) throws Exception { | |
// Configure your producer | |
Properties producerProperties = new Properties(); | |
producerProperties.put("bootstrap.servers", "localhost:29092"); | |
producerProperties.put("acks", "all"); | |
producerProperties.put("retries", 0); | |
producerProperties.put("linger.ms", 1); | |
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer"); | |
producerProperties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); | |
producerProperties.put("schema.registry.url", "http://localhost:8081"); | |
// Initialize a producer | |
Producer<Long, AvroHelloMessage> producer = new KafkaProducer<>(producerProperties); | |
// Use it whenever you need | |
producer.send(new AvroHelloMessage(1L, "this is a message", 2.4f, 1)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Main { | |
public static Properties configureConsumer() { | |
Properties consumerProperties = new Properties(); | |
consumerProperties.put("bootstrap.servers", "localhost:29092"); | |
consumerProperties.put("group.id", "HelloConsumer"); | |
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); | |
consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); | |
consumerProperties.put("schema.registry.url", "http://localhost:8081"); | |
// Configure Avro deserializer to convert the received data to a SpecificRecord (i.e. AvroHelloMessage) | |
// instead of a GenericRecord (i.e. schema + array of deserialized data). | |
consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); | |
return consumerProperties; | |
} | |
public static void main(String[] args) throws Exception { | |
// Initialize a consumer | |
final Consumer<Long, AvroHelloMessage> consumer = new KafkaConsumer<>(configureConsumer()); | |
// Chose the topics you will be polling from. | |
// You can subscribe to all topics matching a Regex. | |
consumer.subscribe(Pattern.compile("hello_topic_avro")); | |
// Poll will return all messages from the current consumer offset | |
final AtomicBoolean shouldStop = new AtomicBoolean(false); | |
Thread consumerThread = new Thread(() -> { | |
final Duration timeout = Duration.ofSeconds(5); | |
while (!shouldStop) { | |
for (ConsumerRecord<Long, AvroHelloMessage> record : consumer.poll(timeout)) { | |
// Use your record | |
AvroHelloMessage value = record.value(); | |
} | |
// Be kind to the broker while polling | |
Thread.sleep(5); | |
} | |
consumer.close(timeout); | |
}); | |
// Start consuming && do other things | |
consumerThread.start(); | |
// [...] | |
// End consumption from customer | |
shouldStop.set(true); | |
consumerThread.join(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Main { | |
public static void main(String[] args) throws Exception { | |
Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
final StreamsBuilder builder = new StreamsBuilder(); | |
// All the process is in the builder configuration | |
builder.<String, String>stream("streams-plaintext-input") | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) | |
.groupBy((key, value) -> value) | |
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) | |
.toStream() | |
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); | |
final Topology topology = builder.build(); | |
final KafkaStreams streams = new KafkaStreams(topology, props); | |
final CountDownLatch latch = new CountDownLatch(1); | |
// attach shutdown handler to catch control-c | |
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { | |
@Override | |
public void run() { | |
streams.close(); | |
latch.countDown(); | |
} | |
}); | |
// The consumer loop is handled by the library | |
streams.start(); | |
latch.await(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "mongo-sink", | |
"config": { | |
"topics": "mongo-source", | |
"tasks.max": "1", | |
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", | |
"connection.uri": "mongodb://${file:/auth.properties:username}:${file:/auth.properties:password}@mongo:27017", | |
"database": "kafka_connect", | |
"collection": "sink", | |
"max.num.retries": "1", | |
"retries.defer.timeout": "5000", | |
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy", | |
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder", | |
"delete.on.null.values": "false", | |
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ | |
http://localhost:8083/connectors -d @sink-conf.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ksql> list topics; | |
Kafka Topic | Partitions | Partition Replicas | |
---------------------------------------------------- | |
hello_topic_json | 1 | 1 | |
---------------------------------------------------- | |
ksql> print 'hello_topic_json' from beginning; | |
Key format: KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING | |
Value format: JSON or KAFKA_STRING | |
rowtime: 2021/05/25 08:44:20.922 Z, key: 1, value: {"user_id":1,"message":"this is a message","value":2.4,"version":1} | |
rowtime: 2021/05/25 08:44:20.967 Z, key: 1, value: {"user_id":1,"message":"this is another message","value":2.4,"version":2} | |
rowtime: 2021/05/25 08:44:20.970 Z, key: 2, value: {"user_id":2,"message":"this is another message","value":2.6,"version":1} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Let's create a table from the previous topic | |
ksql> CREATE TABLE messages (user_id BIGINT PRIMARY KEY, message VARCHAR) | |
> WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON'); | |
-- We can see the list and details of each table | |
ksql> list tables; | |
Table Name | Kafka Topic | Key Format | Value Format | Windowed | |
---------------------------------------------------------------------- | |
MESSAGES | hello_topic_json | KAFKA | JSON | false | |
---------------------------------------------------------------------- | |
ksql> describe messages; | |
Name : MESSAGES | |
Field | Type | |
------------------------------------------ | |
USER_ID | BIGINT (primary key) | |
MESSAGE | VARCHAR(STRING) | |
------------------------------------------ | |
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; | |
-- Appart from some additions to the language, the queries are almost declared in standard SQL. | |
ksql> select * from messages EMIT CHANGES; | |
+--------+------------------------+ | |
|USER_ID |MESSAGE | | |
+--------+------------------------+ | |
|1 |this is another message | | |
|2 |this is another message | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment