Last active
July 25, 2017 12:25
-
-
Save php-cpm/8fe4a0c45c41db74ec19526007242d70 to your computer and use it in GitHub Desktop.
PHP读取kafka的消费offset和生产的offset并计算队列积压情况
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// composer require "nmred/kafka-php" | |
require_once __DIR__.'/vendor/autoload.php'; | |
$server = 'zookeeperHost:2181'; | |
$topic = 'topic_name'; | |
$group = 'consumer_group'; | |
$zookeeper = new \Kafka\ZooKeeper($server); | |
$listBroker = $zookeeper->listBrokers(); | |
$hostList = []; | |
/*获取原信息*/ | |
foreach ($listBroker as $broker) { | |
$hostList[] = $broker['host'] . ':' . $broker['port'] ; | |
} | |
$metaData = new \Kafka\MetaDataFromKafka($hostList); | |
$client = new \Kafka\Client($metaData); | |
$topicDetail = $client->getTopicDetail($topic); | |
/*获取offset的详情*/ | |
$partitions = $topicDetail['partitions']; | |
foreach ($partitions as $key => $partition) { | |
$offset = new \Kafka\Offset($client, $group, $topic, $key); | |
$ConsumerOffset = $offset->getOffset(); | |
print_r($ConsumerOffset."\r\n"); | |
$produceLastOffset = $offset->getProduceOffset(\Kafka\Offset::LAST_OFFSET); | |
print_r($produceLastOffset."\r\n"); | |
// $offset->setOffset($produceEarlyOffset); | |
echo "now lan is ".( $produceLastOffset - $ConsumerOffset)."\r\n"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment