Skip to content

Instantly share code, notes, and snippets.

@relistan
Created March 14, 2016 22:50
Show Gist options
  • Save relistan/e2a2293a17ae4f6d7919 to your computer and use it in GitHub Desktop.
Save relistan/e2a2293a17ae4f6d7919 to your computer and use it in GitHub Desktop.
Kafka Offset Monitor
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Properties
import joptsimple.{ OptionParser, OptionSpec }
import kafka.api.{ OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo }
import kafka.client.ClientUtils
import kafka.common.{ TopicAndPartition, _ }
import kafka.consumer.SimpleConsumer
import kafka.utils._
import kafka.admin.AdminClient
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.collection.{ Set, mutable }
object NitroConsumerGroupCommand {
def main(args: Array[String]) {
val consumerGroupService = new ZkConsumerGroupService("localhost:2181", "dpe")
try {
consumerGroupService.describe()
} catch {
case e: Throwable =>
println("Error while executing consumer group command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
consumerGroupService.close()
}
}
sealed trait ConsumerGroupService {
def list(): Unit
def close(): Unit
protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult
protected def describeGroup(group: String): Unit
protected def describeTopicPartition(group: String,
topicPartitions: Seq[TopicAndPartition],
getPartitionOffset: TopicAndPartition => Option[Long],
getOwner: TopicAndPartition => Option[String]): Unit = {
topicPartitions
.sortBy { case topicPartition => topicPartition.partition }
.foreach { topicPartition =>
describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition),
getOwner(topicPartition))
}
}
protected def printDescribeHeader() {
println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "OWNER"))
}
private def describePartition(group: String,
topic: String,
partition: Int,
offsetOpt: Option[Long],
ownerOpt: Option[String]) {
def print(logEndOffset: Option[Long]): Unit = {
val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"), lag.getOrElse("unknown"), ownerOpt.getOrElse("none")))
}
getLogEndOffset(topic, partition) match {
case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset))
case LogEndOffsetResult.Unknown => print(None)
case LogEndOffsetResult.Ignore =>
}
}
}
class ZkConsumerGroupService(zkConnectOpt: String, groupOpt: String) extends ConsumerGroupService {
private val zkUtils = {
val zkUrl = zkConnectOpt
ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
}
def close() {
zkUtils.close()
}
def list() {
zkUtils.getConsumerGroups().foreach(println)
}
def describe() {
describeGroup(groupOpt)
}
protected def describeGroup(group: String) {
val props = new Properties()
val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty)
println("No topic available for consumer group provided")
printDescribeHeader()
topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
}
private def describeTopic(group: String,
topic: String,
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int) {
val topicPartitions = getTopicPartitions(topic)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val ownerByTopicPartition = topicPartitions.flatMap { topicPartition =>
zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map { owner =>
topicPartition -> owner
}
}.toMap
val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get)
}
private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = {
val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
partitions.map(TopicAndPartition(topic, _))
}
protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
zkUtils.getLeaderForPartition(topic, partition) match {
case Some(-1) => LogEndOffsetResult.Unknown
case Some(brokerId) =>
getZkConsumer(brokerId).map { consumer =>
val topicAndPartition = new TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()
LogEndOffsetResult.LogEndOffset(logEndOffset)
}.getOrElse(LogEndOffsetResult.Ignore)
case None =>
println(s"No broker for partition ${new TopicPartition(topic, partition)}")
LogEndOffsetResult.Ignore
}
}
private def getPartitionOffsets(group: String,
topicPartitions: Seq[TopicAndPartition],
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
val offsetMap = mutable.Map[TopicAndPartition, Long]()
val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
offsetFetchResponse.requestInfo.foreach {
case (topicAndPartition, offsetAndMetadata) =>
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
try {
val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case z: ZkNoNodeException =>
println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
.format(group, topicAndPartition))
}
} else if (offsetAndMetadata.error == Errors.NONE.code)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
println("Could not fetch offset from kafka for group %s partition %s due to %s."
.format(group, topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
}
channel.disconnect()
offsetMap.toMap
}
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
try {
zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
} catch {
case t: Throwable =>
println("Could not parse broker info due to " + t.getMessage)
None
}
}
}
sealed trait LogEndOffsetResult
object LogEndOffsetResult {
case class LogEndOffset(value: Long) extends LogEndOffsetResult
case object Unknown extends LogEndOffsetResult
case object Ignore extends LogEndOffsetResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment