Created
August 8, 2014 18:31
-
-
Save crazed/c5d79d2099a6327adb1f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'open3' | |
require 'oj' | |
require 'multi_json' | |
require 'elasticsearch' | |
require 'faraday' | |
class FlowStoreClient | |
include Elasticsearch::API | |
def initialize(args={}) | |
@bulk_size = args.delete(:bulk_size) || 1000 | |
@thread_count = args.delete(:thread_count) || 5 | |
@pmacct = args.delete(:pmacct) || '/usr/local/bin/pmacct' | |
@pmacct_pipe = args.delete(:pmacct_pipe) || '/tmp/nfacctd-full.pipe' | |
@state_file = args.delete(:state_file) || '/var/tmp/flow_store.state' | |
@run_interval = args.delete(:run_interval) || 60 | |
# Read in state from previous runs | |
read_state! | |
end | |
# Requirement to create a proper elasticsearch client | |
# per the Elasticsearch::API documentation. | |
def perform_request(method, path, params, body) | |
es_client.perform_request(method, path, params, body) | |
end | |
def run! | |
loop do | |
# Make sure the elasticsearch index exists | |
create_index! | |
time = Time.now | |
duration = time - @lastrun | |
timestamp = time.strftime("%Y-%m-%d %H:%M:%S.000") | |
# Grab data from pmacct in json format, then flush it to elastic | |
Open3.popen3("#{@pmacct} -s -p #{@pmacct_pipe} -O json") do |stdin, stdout, sterr, wait_thr| | |
while line = stdout.gets | |
data = { 'stats' => MultiJson.load(line) } | |
data['bps'] = (data['stats']['bytes'] * 8.0 / duration.to_f).round | |
data['pps'] = (data['stats']['packets'] / duration.to_f).round | |
index_bulk(data) | |
end | |
end | |
# Flush the memory for our pipe | |
`#{@pmacct} -l -p #{@pmacct_pipe} -e` | |
# Record the last run, and make sure we flush our state | |
# so that any process can pick up where we left off. | |
@lastrun = Time.now | |
write_state! | |
sleep @run_interval | |
end | |
end | |
def index_bulk(data) | |
@index_bulk_buffer ||= [] | |
@threads ||= [] | |
# Push our data onto the stack | |
@index_bulk_buffer << { index: { _index: current_index, _type: 'flowdata', data: data} } | |
# Flush it to elastic when we hit the BULK size | |
if @index_bulk_buffer.size >= @bulk_size | |
body = @index_bulk_buffer.dup | |
@index_bulk_buffer = [] | |
# Spawn a new thread to handle these requests | |
@threads << Thread.new { self.bulk(body: body) } | |
# If we have too many threads, wait for them to finish | |
if @threads.size >= @thread_count | |
warn "Waiting for #{@threads.size} threads" | |
@threads.each { |t| t.join; @threads.delete(t) } | |
end | |
end | |
end | |
private | |
def write_state! | |
File.open(@state_file, 'w') do |f| | |
f.write(MultiJson.dump({ :lastrun => @lastrun })) | |
end | |
end | |
def read_state! | |
begin | |
File.open(@state_file, 'r') do |f| | |
data = MultiJson.load(f.read) | |
@lastrun = Time.mktime(data['lastrun']) | |
end | |
rescue MultiJson::ParseError => e | |
warn "Invalid JSON found in state file '#{@state_file}'" | |
rescue Errno::ENOENT => e | |
warn "Could not open state file '#{@state_file}'" | |
ensure | |
@lastrun ||= Time.new | |
end | |
end | |
def es_client | |
@es_client ||= Elasticsearch::Client.new | |
end | |
def current_index | |
time = Time.now | |
"flow-#{time.strftime("%Y-%m-%d-%H")}" | |
end | |
def create_index! | |
if not self.indices.exists(index: current_index) | |
puts "Creating index: #{current_index}" | |
create_index | |
end | |
end | |
def create_index | |
self.indices.create(index: current_index, | |
body: { | |
mappings: { | |
flowdata: { | |
_timestamp: { | |
enabled: true, | |
store: true, | |
format: 'date_hour_minute_second_fraction', | |
index: 'analyzed', | |
}, | |
properties: { | |
pps: { | |
type: 'long', | |
index: 'analyzed', | |
enabled: true, | |
store: true, | |
}, | |
bps: { | |
type: 'long', | |
index: 'analyzed', | |
enabled: true, | |
store: true, | |
}, | |
stats: { | |
type: 'object', | |
} | |
}, | |
} | |
} | |
} | |
) | |
end | |
end | |
flow = FlowStoreClient.new | |
flow.run! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment