Last active
June 8, 2022 06:55
-
-
Save erhwenkuo/019ada38e645b4b76862918fe5205c9c to your computer and use it in GitHub Desktop.
使用Kafka的AdminClient API來查詢Kafka中每一個ConsumerGroup己經消費過的offset最大值
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.admin.*; | |
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | |
import org.apache.kafka.common.TopicPartition; | |
import java.util.*; | |
import java.util.concurrent.ExecutionException; | |
/** | |
* Kafka的AdminClient函式庫,支持管理和檢查topics, brokers, configurations和ACLs。 | |
* 所需的最小的Kafka broker版本為0.10.0.0。有些API會需要更高版本的Kafka broker的話會註解在API中。 | |
* | |
* == 使用Kafka的AdminClient API來查詢Kafka中每一個ConsumerGroup的offset == | |
*/ | |
public class ds01_AdminClient_describe_cg_info { | |
public static void main(String[] args) throws ExecutionException, InterruptedException { | |
// 步驟1. 設定要連線到Kafka集群的相關設定 | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡? | |
// 步驟2. 創建AdminClient的instance | |
AdminClient adminClient = KafkaAdminClient.create(props); // 透過create()來產生adminClient的instance | |
// 步驟3. 透過AdminClient的API來取得相關ConsumerGroup的訊息 | |
// *** 取得Kafka叢集裡ConsumerGroup基本資訊 *** // | |
ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups(); | |
// 取得所有的ConsumerGroup, 當有任何一個取得過程有問題時, 這個method就會fail | |
try { | |
//Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get(); | |
Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.valid().get(); | |
// 打印出來 | |
for(ConsumerGroupListing cg : consumerGroupListings) { | |
String consumerGroupId = cg.groupId(); | |
System.out.println("ConsumerGroup: " + cg.groupId()); | |
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroupId); | |
// 取得這個ConsumerGroup曾經訂閱過的Topics的最後offsets | |
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); | |
// 我們產生一個這個ConsumerGroup曾經訂閱過的TopicParition訊息 | |
List<TopicPartition> topicPartitions = new ArrayList<>(); | |
for(Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetAndMetadataMap.entrySet()) { | |
TopicPartition topic_partition = entry.getKey(); // 某一個topic的某一個partition | |
OffsetAndMetadata offset = entry.getValue(); // offset | |
// 打印出來 (在API裡頭取到的offset都是那個partition最大的offset+1 (也就是下一個訊息會被assign的offset), | |
// 因此我們減1來表示現在己經消費過的最大offset | |
System.out.println(String.format(" Topic: %s Partiton: %d Offset: %d", topic_partition.topic(), topic_partition.partition(), offset.offset())); | |
topicPartitions.add(topic_partition); | |
} | |
System.out.print("\n"); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// 步驟4. 適當地釋放AdminClient的資源 | |
adminClient.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment