Skip to content

Instantly share code, notes, and snippets.

Last active May 15, 2018 18:19
Show Gist options
  • Save chicagobuss/88a0fd45216a97a5f768351b28fe0e91 to your computer and use it in GitHub Desktop.
Save chicagobuss/88a0fd45216a97a5f768351b28fe0e91 to your computer and use it in GitHub Desktop.
package main
import (
func main() {
topic := "metrics.json.kafka"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "",
"broker.version.fallback": "",
"": 0,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "W4VH52BWAMDWNLFX",
"sasl.password": "+PcYdQYaz/sse0k7cAU/upBQ0rU7vUta5j/NFjlFCL4A52C2do8jnmuRxLfAO9Yk",
"": 6000,
"": "josh-group-1",
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
panic(fmt.Sprintf("Failed to create consumer: %s", err))
topics := []string{topic}
c.SubscribeTopics(topics, nil)
var clusterCounts map[string]int32
clusterCounts = make(map[string]int32)
type Metric struct {
MetricType string `json:"type"` // kafka
MetricName string `json:"_metricname"` // kafka_server_socket_server_metrics_outgoing_byte_total
ClusterId string `json:"clusterId"` // "pkc-lg0v4",
ControllerRevisionHash string `json:"controller-revision-hash"` // kafka-6b794c8b7
Instance string `json:"instance "` //
Job string `json:"job"` // scraper
Listener string `json:"listener"` // REPLICATION
NetworkProcessor string `json:"networkProcessor"` // 5
PscName string `json:""` // kafka
PscVersion string `json:""` // v1
PodName string `json:"pod-name"` // kafka-0
Source string `json:"source"` // pod/pkc-lg0v4/kafka-0
SsPodName string `json:""` // kafka-0
type Sample struct {
//UUID to help deduplicate the possible resends of the same samples
Id string `json:"id"`
// Where the sample is geneated
Source string `json:"source"`
// Name of the metric consists of a label set - overrode the default with our Metric
Metric Metric `json:"metric"`
// Sample value
Value model.SampleValue `json:"value"`
// When the sample is created
Timestamp model.Time `json:"timestamp"`
for {
msg, err := c.ReadMessage(100 * time.Millisecond)
if err == nil {
// Original
//message := string(msg.Value)
message := []byte(msg.Value)
/* example metric message: {
"metric": {
"timestamp":1526335130000 } */
var sample Sample
if err := json.Unmarshal(message, &sample); err != nil {
fmt.Printf("Failed to decode json from message: %s. Error: %s\n", message, err)
//fmt.Printf("Consumed message from partition %s.", msg.TopicPartition)
//fmt.Printf("consumed msg object: %s\", msg)
//fmt.Printf("Consumed raw message: %s\n", message)
fmt.Printf("Got sample: %+v\n", sample)
//fmt.Printf("Got metric %s with value: %s", sample.Metric.MetricName, sample.Value)
clusterCounts[sample.Metric.ClusterId] += 1
//fmt.Printf("consumed: %s: %s\n", msg.TopicPartition, message)
fmt.Printf(" Current clusterCounts: ")
for key, value := range clusterCounts {
fmt.Printf("%s:%s, ", key, value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment