Skip to content

Instantly share code, notes, and snippets.

@up1
Last active April 30, 2025 08:48
Show Gist options
  • Save up1/88843666d99193ce90dbc9795af330e4 to your computer and use it in GitHub Desktop.
Save up1/88843666d99193ce90dbc9795af330e4 to your computer and use it in GitHub Desktop.
Queue in Kafka 4.0
unstable.api.versions.enable=true
group.coordinator.rebalance.protocols=classic,consumer,share
$bin/kafka-server-start.sh ./config/server.properties
INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
WARN Share groups and the new 'share' rebalance protocol are enabled.
This is part of the early access of KIP-932 and MUST NOT be used in production. (kafka.server.KafkaConfig)
15:12:45.035 [main] WARN org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator -- Share groups and KafkaShareConsumer are part of the early access of KIP-932 and MUST NOT be used in production.
15:12:45.061 [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector -- initializing Kafka metrics collector
15:12:45.145 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 4.0.0
15:12:45.146 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka commitId: 985bc99521dd22bb
15:12:45.146 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka startTimeMs: 1746000765144
15:12:45.184 [main] INFO org.apache.kafka.clients.consumer.internals.ShareConsumerImpl -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Subscribed to topics: demo-topic
15:12:45.289 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Member kIHeZsBrR4GhC51841hWJw with epoch 0 transitioned from UNSUBSCRIBED to JOINING.
15:12:45.375 [consumer_background_thread] INFO org.apache.kafka.clients.Metadata -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Cluster ID: MXC71-GcTt2bA08DvQPO4g
15:12:45.380 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Discovered group coordinator Coordinator(key='my-share-group', nodeId=1, host='localhost', port=9092, errorCode=0, errorMessage='')
15:12:45.417 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Member kIHeZsBrR4GhC51841hWJw with epoch 2 transitioned from JOINING to RECONCILING.
15:12:45.419 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Reconciling assignment with local epoch 0
Member: kIHeZsBrR4GhC51841hWJw
Assigned partitions: [demo-topic-0]
Current owned partitions: []
Added partitions (assigned - owned): [demo-topic-0]
Revoked partitions (owned - assigned): []
$bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group
GROUP TOPIC PARTITION START-OFFSET
my-share-group demo-topic 0 2
public class Consumer {
public static void main(String[] args) {
KafkaShareConsumer<String, String> consumer = getStringStringKafkaShareConsumer();
consumer.subscribe(List.of("demo-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(
Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System. out. printf("offset = %d, key = %s, value = %s%n", record. offset(), record. key(), record. value());
// Process record
// Send ack to broker
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
}
consumer.commitSync();
}
}
private static KafkaShareConsumer<String, String> getStringStringKafkaShareConsumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer =
new KafkaShareConsumer<>(
props, new StringDeserializer(), new StringDeserializer());
return consumer;
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.0.0</version>
</dependency>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment