Skip to content

Instantly share code, notes, and snippets.

@markuskont
Created September 29, 2017 13:44
Show Gist options
  • Save markuskont/60fc64ee5f478e3c26b1dafe58207b0d to your computer and use it in GitHub Desktop.
Save markuskont/60fc64ee5f478e3c26b1dafe58207b0d to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/linkedin/goavro"
ps "github.com/markuskont/probstruct"
)
//type netflow5 struct {
// IPv4DstPort uint16 `json:"ipv4_dst_port"`
// FirstSwitched uint64 `json:"first_switched"`
// SrcTos uint `json:"src_tos"`
// IPv4SrcAddr string `json:"ipv4_src_addr"`
// Offset uint64 `json:"offset"`
// InAS uint16 `json:"in_as"`
// InBytes uint64 `json:"in_bytes"`
// InPkts uint64 `json:"in_pkts"`
// Duration int `json:"duration"`
// DstMask int `json:"dst_mask"`
// OutAS uint16 `json:"out_as"`
// SrcMask int `json:"src_mask"`
// IPv4DstAddr string `json:"ipv4_dst_addr"`
// Protocol uint8 `json:"protocol"`
// TCPflags uint `json:"tcp_flags"`
// IPv4NextHop string `json:"ipv4_next_hop"`
// LastSwitched uint64 `json:"last_switched"`
// InputSNMP int `json:"input_snmp"`
// IPv4SrcPort uint16 `json:"ipv4_src_port"`
// Timestamp uint64 `json:"timestamp"`
// OutputSNMP int `json:"output_snmp"`
//}
type Kirka struct {
ipv4cardinality *ps.HLL
}
//
func Init() (k *Kirka, err error) {
k = &Kirka{}
k.ipv4cardinality, err = ps.InitHLL(16, false, 1)
if err != nil {
return nil, errors.New("Unable to create HyperLogLog for ipv4 counting")
}
return k, nil
}
func loadSchema(source string) (str string) {
b, err := ioutil.ReadFile(source) // just pass the file name
if err != nil {
log.Fatal(err)
}
str = string(b)
return
}
func produce(logFile, topic string, brokers []string, config *sarama.Config, c *goavro.Codec) {
file, err := os.Open(logFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()
var x map[string]interface{}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatal(err)
}
scanner := bufio.NewScanner(file)
count := 0
debug := 1024 * 8
for scanner.Scan() {
count++
json.Unmarshal([]byte(scanner.Text()), &x)
binary, err := c.BinaryFromNative(nil, x)
if err != nil {
log.Fatal(err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Timestamp: time.Now(),
Value: sarama.ByteEncoder(binary),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
if count%debug == 0 {
log.Println(scanner.Text())
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
}
}
}
func consume(topic string, brokers []string, config *sarama.Config, c *goavro.Codec, IPv4CardinalityChan chan uint32) {
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
log.Fatal(err)
}
partitionList, err := consumer.Partitions(topic)
if err != nil {
log.Fatal(err)
}
offset := sarama.OffsetOldest
for _, partition := range partitionList {
fmt.Println(partition)
stream, _ := consumer.ConsumePartition(topic, partition, offset)
go func(stream sarama.PartitionConsumer) {
for msg := range stream.Messages() {
flow, _, _ := c.NativeFromBinary(msg.Value)
data := flow.(map[string]interface{})
IPv4CardinalityChan <- ipv4toUInt(data["ipv4_src_addr"].(string))
IPv4CardinalityChan <- ipv4toUInt(data["ipv4_dst_addr"].(string))
}
}(stream)
}
}
func (k *Kirka) ipv4CardinalityEstimator(IPv4CardinalityChan chan uint32) {
for {
ipv4 := <-IPv4CardinalityChan
k.ipv4cardinality.Add32(ipv4)
}
}
func ipv4toUInt(ipv4 string) uint32 {
split := strings.Split(ipv4, ".")
var d [4]int
for idx, item := range split {
d[idx], _ = strconv.Atoi(item)
}
return (16777216 * uint32(d[0])) + (65536*uint32(d[1]) + (256 * uint32(d[2])) + uint32(d[3]))
}
func main() {
var (
logFile = flag.String("input", "/tmp/event.log", "Input log file")
producer = flag.Bool("producer", false, "Produce instead of consume")
topic = flag.String("topic", "test", "Kafka topic to use")
schema = flag.String("schema", "/tmp/schema.json", "Avro schema for encoding/decoding")
kafka = flag.String("brokers", "localhost:9092", "Comma-separated list of brokers")
)
flag.Parse()
// avro codec stuff
flow5codec := loadSchema(*schema)
c, err := goavro.NewCodec(flow5codec)
if err != nil {
log.Fatal(err)
}
// kafka generic
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
//config.Producer.MaxMessageBytes = 16 << 20 // 16MB
config.Producer.Return.Successes = true
//brokers := []string{"localhost:9092"}
brokers := strings.Split(*kafka, ",")
if *producer == true {
produce(*logFile, *topic, brokers, config, c)
} else {
k, _ := Init()
IPv4CardinalityChan := make(chan uint32, 256)
consume(*topic, brokers, config, c, IPv4CardinalityChan)
go k.ipv4CardinalityEstimator(IPv4CardinalityChan)
for {
time.Sleep(30 * time.Second)
k.ipv4cardinality.Count()
fmt.Println(k.ipv4cardinality.GetCardinality())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment