Created
September 29, 2017 13:44
-
-
Save markuskont/60fc64ee5f478e3c26b1dafe58207b0d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"os" | |
"strconv" | |
"strings" | |
"time" | |
"github.com/Shopify/sarama" | |
"github.com/linkedin/goavro" | |
ps "github.com/markuskont/probstruct" | |
) | |
//type netflow5 struct { | |
// IPv4DstPort uint16 `json:"ipv4_dst_port"` | |
// FirstSwitched uint64 `json:"first_switched"` | |
// SrcTos uint `json:"src_tos"` | |
// IPv4SrcAddr string `json:"ipv4_src_addr"` | |
// Offset uint64 `json:"offset"` | |
// InAS uint16 `json:"in_as"` | |
// InBytes uint64 `json:"in_bytes"` | |
// InPkts uint64 `json:"in_pkts"` | |
// Duration int `json:"duration"` | |
// DstMask int `json:"dst_mask"` | |
// OutAS uint16 `json:"out_as"` | |
// SrcMask int `json:"src_mask"` | |
// IPv4DstAddr string `json:"ipv4_dst_addr"` | |
// Protocol uint8 `json:"protocol"` | |
// TCPflags uint `json:"tcp_flags"` | |
// IPv4NextHop string `json:"ipv4_next_hop"` | |
// LastSwitched uint64 `json:"last_switched"` | |
// InputSNMP int `json:"input_snmp"` | |
// IPv4SrcPort uint16 `json:"ipv4_src_port"` | |
// Timestamp uint64 `json:"timestamp"` | |
// OutputSNMP int `json:"output_snmp"` | |
//} | |
type Kirka struct { | |
ipv4cardinality *ps.HLL | |
} | |
// | |
func Init() (k *Kirka, err error) { | |
k = &Kirka{} | |
k.ipv4cardinality, err = ps.InitHLL(16, false, 1) | |
if err != nil { | |
return nil, errors.New("Unable to create HyperLogLog for ipv4 counting") | |
} | |
return k, nil | |
} | |
func loadSchema(source string) (str string) { | |
b, err := ioutil.ReadFile(source) // just pass the file name | |
if err != nil { | |
log.Fatal(err) | |
} | |
str = string(b) | |
return | |
} | |
func produce(logFile, topic string, brokers []string, config *sarama.Config, c *goavro.Codec) { | |
file, err := os.Open(logFile) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer file.Close() | |
var x map[string]interface{} | |
producer, err := sarama.NewSyncProducer(brokers, config) | |
if err != nil { | |
log.Fatal(err) | |
} | |
scanner := bufio.NewScanner(file) | |
count := 0 | |
debug := 1024 * 8 | |
for scanner.Scan() { | |
count++ | |
json.Unmarshal([]byte(scanner.Text()), &x) | |
binary, err := c.BinaryFromNative(nil, x) | |
if err != nil { | |
log.Fatal(err) | |
} | |
msg := &sarama.ProducerMessage{ | |
Topic: topic, | |
Timestamp: time.Now(), | |
Value: sarama.ByteEncoder(binary), | |
} | |
partition, offset, err := producer.SendMessage(msg) | |
if err != nil { | |
log.Printf("FAILED to send message: %s\n", err) | |
} else { | |
if count%debug == 0 { | |
log.Println(scanner.Text()) | |
log.Printf("> message sent to partition %d at offset %d\n", partition, offset) | |
} | |
} | |
} | |
} | |
func consume(topic string, brokers []string, config *sarama.Config, c *goavro.Codec, IPv4CardinalityChan chan uint32) { | |
consumer, err := sarama.NewConsumer(brokers, config) | |
if err != nil { | |
log.Fatal(err) | |
} | |
partitionList, err := consumer.Partitions(topic) | |
if err != nil { | |
log.Fatal(err) | |
} | |
offset := sarama.OffsetOldest | |
for _, partition := range partitionList { | |
fmt.Println(partition) | |
stream, _ := consumer.ConsumePartition(topic, partition, offset) | |
go func(stream sarama.PartitionConsumer) { | |
for msg := range stream.Messages() { | |
flow, _, _ := c.NativeFromBinary(msg.Value) | |
data := flow.(map[string]interface{}) | |
IPv4CardinalityChan <- ipv4toUInt(data["ipv4_src_addr"].(string)) | |
IPv4CardinalityChan <- ipv4toUInt(data["ipv4_dst_addr"].(string)) | |
} | |
}(stream) | |
} | |
} | |
func (k *Kirka) ipv4CardinalityEstimator(IPv4CardinalityChan chan uint32) { | |
for { | |
ipv4 := <-IPv4CardinalityChan | |
k.ipv4cardinality.Add32(ipv4) | |
} | |
} | |
func ipv4toUInt(ipv4 string) uint32 { | |
split := strings.Split(ipv4, ".") | |
var d [4]int | |
for idx, item := range split { | |
d[idx], _ = strconv.Atoi(item) | |
} | |
return (16777216 * uint32(d[0])) + (65536*uint32(d[1]) + (256 * uint32(d[2])) + uint32(d[3])) | |
} | |
func main() { | |
var ( | |
logFile = flag.String("input", "/tmp/event.log", "Input log file") | |
producer = flag.Bool("producer", false, "Produce instead of consume") | |
topic = flag.String("topic", "test", "Kafka topic to use") | |
schema = flag.String("schema", "/tmp/schema.json", "Avro schema for encoding/decoding") | |
kafka = flag.String("brokers", "localhost:9092", "Comma-separated list of brokers") | |
) | |
flag.Parse() | |
// avro codec stuff | |
flow5codec := loadSchema(*schema) | |
c, err := goavro.NewCodec(flow5codec) | |
if err != nil { | |
log.Fatal(err) | |
} | |
// kafka generic | |
config := sarama.NewConfig() | |
config.Producer.Retry.Max = 5 | |
config.Producer.RequiredAcks = sarama.WaitForAll | |
//config.Producer.MaxMessageBytes = 16 << 20 // 16MB | |
config.Producer.Return.Successes = true | |
//brokers := []string{"localhost:9092"} | |
brokers := strings.Split(*kafka, ",") | |
if *producer == true { | |
produce(*logFile, *topic, brokers, config, c) | |
} else { | |
k, _ := Init() | |
IPv4CardinalityChan := make(chan uint32, 256) | |
consume(*topic, brokers, config, c, IPv4CardinalityChan) | |
go k.ipv4CardinalityEstimator(IPv4CardinalityChan) | |
for { | |
time.Sleep(30 * time.Second) | |
k.ipv4cardinality.Count() | |
fmt.Println(k.ipv4cardinality.GetCardinality()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment