Skip to content

Instantly share code, notes, and snippets.

@pumbaEO
Created January 24, 2015 18:01
Show Gist options
  • Save pumbaEO/092511e0fbf55a6905fe to your computer and use it in GitHub Desktop.
Save pumbaEO/092511e0fbf55a6905fe to your computer and use it in GitHub Desktop.
sqliteonec.rb
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket"
# Read rows from an sqlite database.
#
# This is most useful in cases where you are logging directly to a table.
# Any tables being watched must have an 'id' column that is monotonically
# increasing.
#
# All tables are read by default except:
# * ones matching 'sqlite_%' - these are internal/adminstrative tables for sqlite
# * 'since_table' - this is used by this plugin to track state.
#
# ## Example
#
# % sqlite /tmp/example.db
# sqlite> CREATE TABLE weblogs (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# ip STRING,
# request STRING,
# response INTEGER);
# sqlite> INSERT INTO weblogs (ip, request, response)
# VALUES ("1.2.3.4", "/index.html", 200);
#
# Then with this logstash config:
#
# input {
# sqlite {
# path => "/tmp/example.db"
# type => weblogs
# }
# }
# output {
# stdout {
# debug => true
# }
# }
#
# Sample output:
#
# {
# "@source" => "sqlite://sadness/tmp/x.db",
# "@tags" => [],
# "@fields" => {
# "ip" => "1.2.3.4",
# "request" => "/index.html",
# "response" => 200
# },
# "@timestamp" => "2013-05-29T06:16:30.850Z",
# "@source_host" => "sadness",
# "@source_path" => "/tmp/x.db",
# "@message" => "",
# "@type" => "foo"
# }
#
class LogStash::Inputs::Sqlite < LogStash::Inputs::Base
config_name "sqliteonec"
milestone 1
# The path to the sqlite database file.
config :path, :validate => :string, :required => true
# Any tables to exclude by name.
# By default all tables are followed.
config :exclude_tables, :validate => :array, :default => []
# How many rows to fetch at a time from each SELECT call.
config :batch, :validate => :number, :default => 5
SINCE_TABLE = :since_table
public
def init_placeholder_table(db)
begin
db.create_table SINCE_TABLE do
String :table
Int :place
end
rescue
@logger.debug("since tables already exists")
end
end
public
def get_placeholder(db, table)
since = db[SINCE_TABLE]
x = since.where(:table => "#{table}")
if x[:place].nil?
init_placeholder(db, table)
return 0
else
@logger.debug("placeholder already exists, it is #{x[:place]}")
return x[:place][:place]
end
end
public
def init_placeholder(db, table)
@logger.debug("init placeholder for #{table}")
since = db[SINCE_TABLE]
since.insert(:table => table, :place => 0)
end
public
def update_placeholder(db, table, place)
@logger.debug("set placeholder to #{place}")
since = db[SINCE_TABLE]
since.where(:table => table).update(:place => place)
end
public
def get_all_tables(db)
return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end
public
def get_n_rows_from_table(db, table, offset, limit)
dataset = db["SELECT * FROM #{table}"]
return db["SELECT T1.rowID as id, T1.severity, T1.date, T1.transactionStatus, T1.transactionDate, T1.transactionID, T1.userCode, T2.code, T2.name, T3.code, T3.name, T4.code, T4.name, T5.code, T5.name, T1.comment, T1.data, T1.dataPresentation, T1.workServerCode, T6.code, T6.name, T1.primaryPortCode, T1.secondaryPortCode FROM EventLog T1 LEFT OUTER JOIN UserCodes T2 ON T1.userCode = T2.code LEFT OUTER JOIN ComputerCodes T3 ON T1.computerCode = T3.code LEFT OUTER JOIN AppCodes T4 ON T1.appCode = T4.code LEFT OUTER JOIN EventCodes T5 ON T1.eventCode = T5.code LEFT OUTER JOIN WorkServerCodes T6 ON T1.workServerCode = T6.code WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end
public
def register
require "sequel"
require "jdbc/sqlite3"
@host = Socket.gethostname
@logger.info("Registering sqlite input", :database => @path)
@db = Sequel.connect("jdbc:sqlite:#{@path}")
@tables = get_all_tables(@db)
@table_data = {}
@tables.each do |table|
init_placeholder_table(@db)
last_place = get_placeholder(@db, table)
@table_data[table] = { :name => table, :place => last_place }
end
end # def register
public
def run(queue)
sleep_min = 0.01
sleep_max = 5
sleeptime = sleep_min
begin
@logger.debug("Tailing sqlite db", :path => @path)
loop do
count = 0
@table_data.each do |k, table|
table_name = table[:name]
offset = table[:place]
@logger.debug("offset is #{offset}", :k => k, :table => table_name)
next if table_name != "EventLog"
rows = get_n_rows_from_table(@db, table_name, offset, @batch)
count += rows.count
rows.each do |row|
event = LogStash::Event.new("host" => @host, "db" => @db)
decorate(event)
# store each column as a field in the event.
row.each do |column, element|
next if column == :id
event[column.to_s] = element
end
queue << event
@table_data[k][:place] = row[:id]
end
# Store the last-seen row in the database
update_placeholder(@db, table_name, @table_data[k][:place])
end
if count == 0
# nothing found in that iteration
# sleep a bit
@logger.debug("No new rows. Sleeping.", :time => sleeptime)
sleeptime = [sleeptime * 2, sleep_max].min
sleep(sleeptime)
else
sleeptime = sleep_min
end
end # loop
end # begin/rescue
end #run
end # class Logtstash::Inputs::EventLog
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment