Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save thom-vend/1563b53bf9f9af2ebe01a55d7551cbde to your computer and use it in GitHub Desktop.
Save thom-vend/1563b53bf9f9af2ebe01a55d7551cbde to your computer and use it in GitHub Desktop.
Kafka python: describe log dir example
import kafka
from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String
# Neither kafka-python nor confluent-kafka-python have implemented the DescribeLogDirsRequest
# WIP PR on kafka-python: see https://github.com/dpkp/kafka-python/pull/2278
# Backported stuff from the PR to support DescribeLogDirs with kafka-python
class DescribeLogDirsResponse_v0(Response):
API_KEY = 35
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"log_dirs",
Array(
("error_code", Int16),
("log_dir", String("utf-8")),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("partition_size", Int64),
("offset_lag", Int64),
("is_future_key", Boolean),
),
),
),
),
),
),
)
class DescribeLogDirsRequest_v0(Request):
API_KEY = 35
API_VERSION = 0
RESPONSE_TYPE = DescribeLogDirsResponse_v0
SCHEMA = Schema(("topics", Array(("topic", String("utf-8")), ("partitions", Int32))))
DescribeLogDirsResponse = [
DescribeLogDirsResponse_v0,
]
DescribeLogDirsRequest = [
DescribeLogDirsRequest_v0,
]
# Note we add a custom broker_id to avoid fetching from the current broker
def describe_log_dirs(self, broker_id):
"""Send a DescribeLogDirsRequest request to a broker.
:return: A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(broker_id, request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient.".format(version)
)
return future.value
# monkey patch admin client
kafka.KafkaAdminClient.describe_log_dirs = describe_log_dirs
##############################################################################
# Our little helpers to gather the size of all partitions across all brokers
def get_log_dir_size(clientAdmin):
"""
Return a dict of broker id -> topic -> partition -> size
Goal: gather all data we might need one day or another.
"""
brokersIds = [data["node_id"] for data in clientAdmin.describe_cluster()["brokers"]]
logDirSizes = {} # broker id -> topic -> partition -> size
for brokerId in brokersIds:
logDirSizes[brokerId] = {}
logDirResponse = clientAdmin.describe_log_dirs(brokerId)
for topic, partitions in logDirResponse.log_dirs[0][2]:
logDirSizes[brokerId][topic] = {}
logDirSizes[brokerId][topic] = {}
for partition in partitions:
logDirSizes[brokerId][topic][partition[0]] = partition[1]
return logDirSizes
# Example of usage: sum topic size per broker id
def gen_size_metrics(logDirSizes):
"""
Compute Sum of partition size per topic per broker
Return a list of metrics to be sent to datadog
each metric is a dict, ({"value": <value as float>, "tags": <tags as list of strings>})
Eeach tag is a string of the form "key:value"
"""
metrics = []
for brokerId in logDirSizes.keys():
for topic in logDirSizes[brokerId]:
size = sum(logDirSizes[brokerId][topic].values())
tags = [f"broker_id:{brokerId}", f"topic:{topic}"]
metrics.append({"value": size, "tags": tags})
return metrics
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment