Created
February 20, 2015 13:41
-
-
Save aojea/cf77bb1d84216f59395f to your computer and use it in GitHub Desktop.
Import Netflow CSV Elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.7 | |
import csv, sys, time, json, elasticsearch | |
from elasticsearch import Elasticsearch | |
from elasticsearch import helpers | |
mapping = { | |
"fnf1x": { | |
"properties": { | |
"ts": {"type": "date", "format" : "YYYY-MM-dd HH:mm:ss"}, | |
"sa": {"type": "string", "index": "not_analyzed"}, | |
"sp": {"type": "integer", "index": "not_analyzed"}, | |
"da": {"type": "string", "index": "not_analyzed"}, | |
"dp": {"type": "integer", "index": "not_analyzed"}, | |
"duration": {"type": "long"}, | |
"proto": {"type": "string"}, | |
"pkt_in": {"type": "long"}, | |
"pkt_out": {"type": "long"}, | |
"byte_in": {"type": "long"}, | |
"byte_out": {"type": "long"} | |
} | |
} | |
} | |
es=Elasticsearch() | |
es.indices.create("netflowlab", ignore = [400,404]) | |
es.indices.put_mapping(index="netflowlab", doc_type="fnf1x", body=mapping) | |
jdata = dict() | |
actions = list() | |
j = 0 | |
filename = sys.argv[1] | |
with open(filename,'r') as fin: | |
# csv.DictReader uses first line in file for column headings by default | |
dr = csv.DictReader(fin) # comma is default delimiter | |
dr.next() # Skip first line | |
for i in dr: | |
row = (i['parsedDate'], i['ipLayerProtocolCode'], i['firstSeenSrcIp'], i['firstSeenDestIp'], \ | |
i['firstSeenSrcPort'], i['firstSeenDestPort'], i['durationSeconds'], \ | |
i['firstSeenSrcTotalBytes'],i['firstSeenDestTotalBytes'], \ | |
i['firstSeenSrcPacketCount'], i['firstSeenDestPacketCount']) | |
ptime = time.strptime(row[0][0:19], "%Y-%m-%d %H:%M:%S") | |
ctime = time.strftime('%Y-%m-%d %H:%M:%S', ptime) | |
jdata = { 'ts': ctime, 'sa': row[2], 'sp': int(float(row[4])), 'da': row[3], 'dp': \ | |
int(float(row[5])), 'duration': int(row[6]), 'pkt_in': int(row[9]), 'pkt_out': int(row[10]),\ | |
'byte_in': int(row[7]), 'byte_out': int(row[8]), 'proto': row[1] } | |
# Too slow | |
#es.index(index="netflowlab", doc_type="fnf1x", id = j, body = jdata) | |
action = { '_index': 'netflowlab', '_id': j, '_type': 'fnf1x', '_source': json.dumps(jdata, separators=(',', ':'))} | |
actions.append(action) | |
j += 1 | |
if j % 100000 == 0: | |
elasticsearch.helpers.bulk(es, actions) | |
print "Indexed %d, working on next 100000" %(j) | |
actions = list() | |
elasticsearch.helpers.bulk(es, actions) | |
print "Indexed %d, finishing." %(j) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment