由于相关项目有 Kafka 的操作需求,因此,需要当前平台连接 Kafka 获取相关数据。
- CentOS 6.7
- Kafka 0.10
- PHP 5.6.21
根据官网指引地址 我们选择对应的 PHP 客户端
$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel
$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ sudo make install
参考官网安装说明
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ # For PHP 7, checkout the php7 branch:
$ # git checkout php7
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install
$ sh -c 'echo "extension=kafka.so" >> /usr/local/php/etc/php.ini'
$ php -m | grep kafka
Warning: PHP Startup: Unable to load dynamic library '/usr/local/php/lib/php/extensions/no-debug-non-zts-20131226/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0
$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig
$ ldconfig -p
$ php -m | grep kafka
rdkafka
目前使用高级消费者模式,参考官网的 demo 如下
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.110.92.95,10.110.92.101');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['NetBaseInfo']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
# server
10.110.92.101 # kafka zookeeper JDK
10.110.92.95 # kafka zookeeper JDK
10.110.95.23 # kafka zookeeper JDK -- 暂时有问题
# productor port
9092
# consume port
2181
# topic
NetBaseInfo