Skip to content

Instantly share code, notes, and snippets.

@chicagobuss
Last active May 15, 2018 18:19
Show Gist options
  • Save chicagobuss/88a0fd45216a97a5f768351b28fe0e91 to your computer and use it in GitHub Desktop.
Save chicagobuss/88a0fd45216a97a5f768351b28fe0e91 to your computer and use it in GitHub Desktop.
metric-consumer
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/prometheus/common/model"
)
func main() {
topic := "metrics.json.kafka"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "pkc-e88qe.us-west-2.aws.devel.cpdev.cloud:9092",
"broker.version.fallback": "0.10.0.0",
"api.version.fallback.ms": 0,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": "W4VH52BWAMDWNLFX",
"sasl.password": "+PcYdQYaz/sse0k7cAU/upBQ0rU7vUta5j/NFjlFCL4A52C2do8jnmuRxLfAO9Yk",
"session.timeout.ms": 6000,
"group.id": "josh-group-1",
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
panic(fmt.Sprintf("Failed to create consumer: %s", err))
}
topics := []string{topic}
c.SubscribeTopics(topics, nil)
var clusterCounts map[string]int32
clusterCounts = make(map[string]int32)
type Metric struct {
MetricType string `json:"type"` // kafka
MetricName string `json:"_metricname"` // kafka_server_socket_server_metrics_outgoing_byte_total
ClusterId string `json:"clusterId"` // "pkc-lg0v4",
ControllerRevisionHash string `json:"controller-revision-hash"` // kafka-6b794c8b7
Instance string `json:"instance "` // 100.96.35.5:7778
Job string `json:"job"` // scraper
Listener string `json:"listener"` // REPLICATION
NetworkProcessor string `json:"networkProcessor"` // 5
PscName string `json:"physicalstatefulcluster.core.confluent.cloud/name"` // kafka
PscVersion string `json:"physicalstatefulcluster.core.confluent.cloud/version"` // v1
PodName string `json:"pod-name"` // kafka-0
Source string `json:"source"` // pod/pkc-lg0v4/kafka-0
SsPodName string `json:"statefulset.kubernetes.io/pod-name"` // kafka-0
}
type Sample struct {
//UUID to help deduplicate the possible resends of the same samples
Id string `json:"id"`
// Where the sample is geneated
Source string `json:"source"`
// Name of the metric consists of a label set - overrode the default with our Metric
Metric Metric `json:"metric"`
// Sample value
Value model.SampleValue `json:"value"`
// When the sample is created
Timestamp model.Time `json:"timestamp"`
}
for {
msg, err := c.ReadMessage(100 * time.Millisecond)
if err == nil {
// Original
//message := string(msg.Value)
message := []byte(msg.Value)
/* example metric message: {
"id":"7b7a90b8-20e7-4280-abed-863c0487a237",
"source":"pod/pkc-lg0v4/kafka-0",
"metric": {
"_metricname":"kafka_server_socket_server_metrics_outgoing_byte_total",
"clusterId":"pkc-lg0v4",
"controller-revision-hash":"kafka-6b794c8b7",
"instance":"100.96.35.5:7778",
"job":"scraper",
"listener":"REPLICATION",
"networkProcessor":"5",
"physicalstatefulcluster.core.confluent.cloud/name":"kafka",
"physicalstatefulcluster.core.confluent.cloud/version":"v1",
"pod-name":"kafka-0",
"source":"pod/pkc-lg0v4/kafka-0",
"statefulset.kubernetes.io/pod-name":"kafka-0",
"type":"kafka"
},
"value":"10",
"timestamp":1526335130000 } */
var sample Sample
if err := json.Unmarshal(message, &sample); err != nil {
fmt.Printf("Failed to decode json from message: %s. Error: %s\n", message, err)
}
//fmt.Printf("Consumed message from partition %s.", msg.TopicPartition)
//fmt.Printf("consumed msg object: %s\", msg)
//fmt.Printf("Consumed raw message: %s\n", message)
fmt.Printf("Got sample: %+v\n", sample)
//fmt.Printf("Got metric %s with value: %s", sample.Metric.MetricName, sample.Value)
clusterCounts[sample.Metric.ClusterId] += 1
//fmt.Printf("consumed: %s: %s\n", msg.TopicPartition, message)
fmt.Printf(" Current clusterCounts: ")
for key, value := range clusterCounts {
fmt.Printf("%s:%s, ", key, value)
}
fmt.Printf("\n")
}
}
c.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment