Last active
April 30, 2025 08:48
-
-
Save up1/88843666d99193ce90dbc9795af330e4 to your computer and use it in GitHub Desktop.
Queue in Kafka 4.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| unstable.api.versions.enable=true | |
| group.coordinator.rebalance.protocols=classic,consumer,share |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| $bin/kafka-server-start.sh ./config/server.properties | |
| INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) | |
| WARN Share groups and the new 'share' rebalance protocol are enabled. | |
| This is part of the early access of KIP-932 and MUST NOT be used in production. (kafka.server.KafkaConfig) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 15:12:45.035 [main] WARN org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator -- Share groups and KafkaShareConsumer are part of the early access of KIP-932 and MUST NOT be used in production. | |
| 15:12:45.061 [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector -- initializing Kafka metrics collector | |
| 15:12:45.145 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 4.0.0 | |
| 15:12:45.146 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka commitId: 985bc99521dd22bb | |
| 15:12:45.146 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka startTimeMs: 1746000765144 | |
| 15:12:45.184 [main] INFO org.apache.kafka.clients.consumer.internals.ShareConsumerImpl -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Subscribed to topics: demo-topic | |
| 15:12:45.289 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Member kIHeZsBrR4GhC51841hWJw with epoch 0 transitioned from UNSUBSCRIBED to JOINING. | |
| 15:12:45.375 [consumer_background_thread] INFO org.apache.kafka.clients.Metadata -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Cluster ID: MXC71-GcTt2bA08DvQPO4g | |
| 15:12:45.380 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Discovered group coordinator Coordinator(key='my-share-group', nodeId=1, host='localhost', port=9092, errorCode=0, errorMessage='') | |
| 15:12:45.417 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Member kIHeZsBrR4GhC51841hWJw with epoch 2 transitioned from JOINING to RECONCILING. | |
| 15:12:45.419 [consumer_background_thread] INFO org.apache.kafka.clients.consumer.internals.ShareMembershipManager -- [ShareConsumer clientId=consumer-my-share-group-1, groupId=my-share-group] Reconciling assignment with local epoch 0 | |
| Member: kIHeZsBrR4GhC51841hWJw | |
| Assigned partitions: [demo-topic-0] | |
| Current owned partitions: [] | |
| Added partitions (assigned - owned): [demo-topic-0] | |
| Revoked partitions (owned - assigned): [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| $bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group | |
| GROUP TOPIC PARTITION START-OFFSET | |
| my-share-group demo-topic 0 2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class Consumer { | |
| public static void main(String[] args) { | |
| KafkaShareConsumer<String, String> consumer = getStringStringKafkaShareConsumer(); | |
| consumer.subscribe(List.of("demo-topic")); | |
| while (true) { | |
| ConsumerRecords<String, String> records = consumer.poll( | |
| Duration.ofMillis(100)); | |
| for (ConsumerRecord<String, String> record : records) { | |
| System. out. printf("offset = %d, key = %s, value = %s%n", record. offset(), record. key(), record. value()); | |
| // Process record | |
| // Send ack to broker | |
| consumer.acknowledge(record, AcknowledgeType.ACCEPT); | |
| } | |
| consumer.commitSync(); | |
| } | |
| } | |
| private static KafkaShareConsumer<String, String> getStringStringKafkaShareConsumer() { | |
| Properties props = new Properties(); | |
| props.setProperty("bootstrap.servers", "localhost:9092"); | |
| props.setProperty("group.id", "my-share-group"); | |
| props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
| props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
| KafkaShareConsumer<String, String> consumer = | |
| new KafkaShareConsumer<>( | |
| props, new StringDeserializer(), new StringDeserializer()); | |
| return consumer; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <dependency> | |
| <groupId>org.apache.kafka</groupId> | |
| <artifactId>kafka-clients</artifactId> | |
| <version>4.0.0</version> | |
| </dependency> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment