Skip to content

Instantly share code, notes, and snippets.

@markuskont
Last active April 20, 2018 07:21
Show Gist options
  • Save markuskont/4967d10592b28835c60678aade3f2266 to your computer and use it in GitHub Desktop.
Save markuskont/4967d10592b28835c60678aade3f2266 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# pip3 install kafka-python
# pip3 install python-snappy
from kafka import KafkaConsumer
import json
topic = "TOPICNAME"
gid = "MYUNIQUEID"
consumer = KafkaConsumer(topic, group_id=gid, bootstrap_servers="HOST:9092", session_timeout_ms=100000)
errs = 0
for msg in consumer:
try:
d = json.loads(msg.value)
t = ( d['src_ip'], d['src_port'], d['dest_ip'], d['dest_port'], d['proto'] );
print(t)
except Exception as e:
print(e, ":", msg.value)
errs += 1
#!/usr/bin/env python
# tested on python 3.6, elastic 6.2
from elasticsearch import Elasticsearch
def pullFromElastic():
blueTraffic = json.load(open("./query.json"))
indexPattern = "sessions2-*"
limit = 5
hosts = ["ela1:9200", "ela2:9200", "ela3:9200"]
agg1 = "sourceTeam"
agg2 = "destTeam"
aggs = ["dataBytes", "packets", "bytes"]
es = Elasticsearch(hosts=hosts)
indexList = indexListTail(es, indexPattern, limit)
data = es.search(index=indexList, body=blueTraffic)
return data
def indexListTail(client, pattern, limit):
return ",".join(indexList(client, pattern)[-limit:])
def indexList(client, pattern):
return sorted([ *client.indices.get(pattern) ])
def esQuery(hosts, index, query):
return Elasticsearch(hosts=hosts).search(index=index, body=query)
{
"size": 0,
"query": {
"bool": {
"must": [
{
"query_string": {
"default_field": "srcGEO",
"query": "srcGEO: [01 TO 99] AND dstGEO: [01 TO 99]"
}
},
{
"range": {
"timestamp": {
"gte": "now-1h/h",
"lte": "now/h"
}
}
}
]
}
},
"aggs": {
"sourceTeam": {
"terms": {
"field": "srcGEO",
"size": 25,
"min_doc_count": 0,
"order": {
"_key": "asc"
}
},
"aggs": {
"destTeam": {
"terms": {
"field": "dstGEO",
"size": 25,
"min_doc_count": 0,
"order": {
"_key": "asc"
}
},
"aggs": {
"dataBytes": {
"sum": {
"field": "totDataBytes"
}
},
"packets": {
"sum": {
"field": "totPackets"
}
},
"bytes": {
"sum": {
"field": "totBytes"
}
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment