Last active
May 15, 2018 18:19
-
-
Save chicagobuss/88a0fd45216a97a5f768351b28fe0e91 to your computer and use it in GitHub Desktop.
metric-consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"time" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
"github.com/prometheus/common/model" | |
) | |
func main() { | |
topic := "metrics.json.kafka" | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": "pkc-e88qe.us-west-2.aws.devel.cpdev.cloud:9092", | |
"broker.version.fallback": "0.10.0.0", | |
"api.version.fallback.ms": 0, | |
"sasl.mechanisms": "PLAIN", | |
"security.protocol": "SASL_SSL", | |
"sasl.username": "W4VH52BWAMDWNLFX", | |
"sasl.password": "+PcYdQYaz/sse0k7cAU/upBQ0rU7vUta5j/NFjlFCL4A52C2do8jnmuRxLfAO9Yk", | |
"session.timeout.ms": 6000, | |
"group.id": "josh-group-1", | |
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}}) | |
if err != nil { | |
panic(fmt.Sprintf("Failed to create consumer: %s", err)) | |
} | |
topics := []string{topic} | |
c.SubscribeTopics(topics, nil) | |
var clusterCounts map[string]int32 | |
clusterCounts = make(map[string]int32) | |
type Metric struct { | |
MetricType string `json:"type"` // kafka | |
MetricName string `json:"_metricname"` // kafka_server_socket_server_metrics_outgoing_byte_total | |
ClusterId string `json:"clusterId"` // "pkc-lg0v4", | |
ControllerRevisionHash string `json:"controller-revision-hash"` // kafka-6b794c8b7 | |
Instance string `json:"instance "` // 100.96.35.5:7778 | |
Job string `json:"job"` // scraper | |
Listener string `json:"listener"` // REPLICATION | |
NetworkProcessor string `json:"networkProcessor"` // 5 | |
PscName string `json:"physicalstatefulcluster.core.confluent.cloud/name"` // kafka | |
PscVersion string `json:"physicalstatefulcluster.core.confluent.cloud/version"` // v1 | |
PodName string `json:"pod-name"` // kafka-0 | |
Source string `json:"source"` // pod/pkc-lg0v4/kafka-0 | |
SsPodName string `json:"statefulset.kubernetes.io/pod-name"` // kafka-0 | |
} | |
type Sample struct { | |
//UUID to help deduplicate the possible resends of the same samples | |
Id string `json:"id"` | |
// Where the sample is geneated | |
Source string `json:"source"` | |
// Name of the metric consists of a label set - overrode the default with our Metric | |
Metric Metric `json:"metric"` | |
// Sample value | |
Value model.SampleValue `json:"value"` | |
// When the sample is created | |
Timestamp model.Time `json:"timestamp"` | |
} | |
for { | |
msg, err := c.ReadMessage(100 * time.Millisecond) | |
if err == nil { | |
// Original | |
//message := string(msg.Value) | |
message := []byte(msg.Value) | |
/* example metric message: { | |
"id":"7b7a90b8-20e7-4280-abed-863c0487a237", | |
"source":"pod/pkc-lg0v4/kafka-0", | |
"metric": { | |
"_metricname":"kafka_server_socket_server_metrics_outgoing_byte_total", | |
"clusterId":"pkc-lg0v4", | |
"controller-revision-hash":"kafka-6b794c8b7", | |
"instance":"100.96.35.5:7778", | |
"job":"scraper", | |
"listener":"REPLICATION", | |
"networkProcessor":"5", | |
"physicalstatefulcluster.core.confluent.cloud/name":"kafka", | |
"physicalstatefulcluster.core.confluent.cloud/version":"v1", | |
"pod-name":"kafka-0", | |
"source":"pod/pkc-lg0v4/kafka-0", | |
"statefulset.kubernetes.io/pod-name":"kafka-0", | |
"type":"kafka" | |
}, | |
"value":"10", | |
"timestamp":1526335130000 } */ | |
var sample Sample | |
if err := json.Unmarshal(message, &sample); err != nil { | |
fmt.Printf("Failed to decode json from message: %s. Error: %s\n", message, err) | |
} | |
//fmt.Printf("Consumed message from partition %s.", msg.TopicPartition) | |
//fmt.Printf("consumed msg object: %s\", msg) | |
//fmt.Printf("Consumed raw message: %s\n", message) | |
fmt.Printf("Got sample: %+v\n", sample) | |
//fmt.Printf("Got metric %s with value: %s", sample.Metric.MetricName, sample.Value) | |
clusterCounts[sample.Metric.ClusterId] += 1 | |
//fmt.Printf("consumed: %s: %s\n", msg.TopicPartition, message) | |
fmt.Printf(" Current clusterCounts: ") | |
for key, value := range clusterCounts { | |
fmt.Printf("%s:%s, ", key, value) | |
} | |
fmt.Printf("\n") | |
} | |
} | |
c.Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment