Last active
January 8, 2024 14:16
-
-
Save maurorappa/90aa99e9ea5edcb621628f62869341df to your computer and use it in GitHub Desktop.
[POC] Aeron messaging exporter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// it will 'sniff' all UDP traffic and collect stats for Aeron messaging flows | |
// https://github.com/real-logic/aeron/wiki/Protocol-Specification | |
// the output will be a collection of gauges with a label containing src-dst-port like: | |
// NetStat{traffic="127.0.0.1-40123-127.0.0.1-55755"} 1960 | |
import ( | |
"flag" | |
"fmt" | |
"github.com/google/gopacket" | |
"github.com/google/gopacket/layers" | |
"github.com/google/gopacket/pcap" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
"log" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"os/signal" | |
"strconv" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
) | |
var ( | |
snapshotLen int32 = 96 | |
promiscuous bool = false | |
err error | |
timeout time.Duration = 5 * time.Second | |
handle *pcap.Handle | |
registry = map[string]int{} | |
captured int32 | |
) | |
var NetStat = prometheus.NewGaugeVec(prometheus.GaugeOpts{ | |
Name: "NetStat", | |
Help: "Network traffic stats", | |
}, | |
[]string{"traffic"}, | |
) | |
var lock = &sync.Mutex{} | |
func init() { | |
prometheus.MustRegister(NetStat) | |
} | |
func output_stats(verbose bool) { | |
lock.Lock() | |
for key, val := range registry { | |
//if verbose {fmt.Printf("%s %d bytes\n", key, val)} | |
NetStat.WithLabelValues(key).Add(float64(val)) | |
} | |
lock.Unlock() | |
} | |
func main() { | |
device := flag.String("i", "lo0", "interface to sniff") | |
filter := flag.String("f", "udp", "pcap filter") | |
interval := flag.Int("d", 5, "update interval") | |
verbose := flag.Bool("v", false, "print out all statistics every interval") | |
addr := flag.String("l", ":9097", "The address and port to listen on for HTTP requests. (ie localhost:8080)") | |
flag.Parse() | |
// Catch CTRL-C | |
c := make(chan os.Signal, 2) | |
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | |
go func() { | |
<-c | |
fmt.Printf("\n\nCaptured %d packets\nBye!\n",captured) | |
output_stats(*verbose) | |
os.Exit(1) | |
}() | |
if *interval > 0 { | |
//set a x seconds ticker | |
ticker := time.NewTicker(time.Duration(*interval) * time.Second) | |
go func() { | |
for t := range ticker.C { | |
if *verbose { | |
fmt.Println("\nStats at", t) | |
} | |
output_stats(*verbose) | |
} | |
}() | |
} | |
go func() { | |
// Open device | |
handle, err = pcap.OpenLive(*device, snapshotLen, promiscuous, timeout) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer handle.Close() | |
// Set filter | |
err = handle.SetBPFFilter(*filter) | |
if err != nil { | |
panic(err) | |
} | |
log.Printf("Capturing from %s, using filter %s\n", *device, *filter) | |
packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) | |
for packet := range packetSource.Packets() { | |
printPacketInfo(packet, *verbose) | |
} | |
}() | |
log.Printf("Metrics will be exposed on %s\n", *addr) | |
http.Handle("/metrics", promhttp.Handler()) | |
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | |
w.Write([]byte(` | |
<html> | |
<head><title>Network Exporter</title></head> | |
<body> | |
<h1>Network Exporter</h1> | |
<h2>parameters '` + strings.Join(os.Args, " ") + `'</h2> | |
<h2>sniffing the network on '` + *device + `' using PCAP filter '` + *filter + `' </h2> | |
<p><a href='/metrics'><b>Metrics</b></a></p> | |
</body> | |
</html> | |
`)) | |
}) | |
log.Fatal(http.ListenAndServe(*addr, nil)) | |
} | |
func printPacketInfo(packet gopacket.Packet, verbose bool) { | |
// Check for errors | |
if err := packet.ErrorLayer(); err != nil { | |
fmt.Println("Error decoding some part of the packet:", err) | |
} | |
ipLayer := packet.Layer(layers.LayerTypeIPv4) | |
ip, _ := ipLayer.(*layers.IPv4) | |
udpLayer := packet.Layer(layers.LayerTypeUDP) | |
udp, _ := udpLayer.(*layers.UDP) | |
traffic := strings.Join([]string{ip.SrcIP.String(), strconv.Itoa(int(udp.SrcPort)), ip.DstIP.String(), strconv.Itoa(int(udp.DstPort))}, "-") | |
data_len := int(ip.Length) - 8 | |
lock.Lock() | |
registry[traffic] = data_len | |
lock.Unlock() | |
if verbose { | |
fmt.Printf("Aeron Frame type %X, Session id %X\n", udp.Payload[6], udp.Payload[12]) | |
} | |
captured++ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment