Created
January 12, 2016 20:29
-
-
Save j-keck/c5a67c7ea0431c02499a to your computer and use it in GitHub Desktop.
perl script to send systemd logs in a postgresql db
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
# | |
# save systemd journal logs to a postgres instance | |
# | |
# | |
# * first run with '-init-db' to intialize the database | |
# * second run to load all logs | |
# (maybe with '-journalctl-args "-n 1000000"') | |
# * the following runs inserts only new log entries | |
# | |
# | |
# | |
# sample queries | |
# ========================== | |
# list in json keys: select json_object_keys(j::json) from logs_raw; | |
# ^^^^ why not with 'jsonb'? | |
# | |
# list journal ts, msg: select jts, j->>'MESSAGE' from logs; | |
# | |
# | |
# | |
# | |
use v5.10; | |
use strict; | |
use warnings; | |
#use diagnostics; # 3 seconds startup slowdown on pi | |
use Getopt::Long; | |
use autodie; | |
# ============================================================================= | |
# default args | |
# ============================================================================= | |
my $show_help; | |
my $init_db; | |
my $db_ip = undef; # optional | |
my $db_port = "5432"; | |
my $db_name = ¤t_os_user(); | |
my $db_user = ¤t_os_user(); | |
my $db_table_name = "logs"; | |
my $journalctl_args = ""; | |
# ============================================================================= | |
# parse / validate args | |
# ============================================================================= | |
GetOptions("help" => \$show_help, | |
"init-db" => \$init_db, | |
"db-ip=s" => \$db_ip, | |
"db-port=i" => \$db_port, | |
"db-name=s" => \$db_name, | |
"db-user=s" => \$db_user, | |
"db-table-name=s" => \$db_table_name, | |
"journalctl-args=s" => \$journalctl_args, | |
) || exit 1; | |
# ============================================================================= | |
# initialize vars | |
# ============================================================================= | |
my $psql_cmd = &build_psql_cmd(); | |
my $machine_id = &read_file("/etc/machine-id"); | |
# ============================================================================= | |
# action | |
# ============================================================================= | |
if(defined($show_help)){ | |
say qq[$0 : save systemd journal logs to a postgres instance | |
ARGS: | |
-h : show this help | |
-init-db : initialize the database | |
-db-ip : database ip | |
-db-port : database port | |
-db-name : database name (default: $db_name) | |
-db-user : database user (default: $db_user) | |
-db-table-name : log table name (default: $db_table_name) | |
-journalctl-args: extra args for journalctl - '-journalct-args "-n 1000000"' | |
]; | |
}elsif(defined($init_db)){ | |
&verify_db_encoding(); | |
&init_db(); | |
}else{ | |
&verify_db_login(); | |
&verify_db_is_initialized(); | |
&update_db(); | |
} | |
# ============================================================================= | |
# action functions | |
# ============================================================================= | |
sub init_db { | |
my $setup_sql = <<"EOF"; | |
CREATE TABLE IF NOT EXISTS $db_table_name( | |
id SERIAL PRIMARY KEY | |
, ts TIMESTAMP | |
, jts TIMESTAMP | |
, hostname TEXT | |
, hostid TEXT | |
, priority INTEGER | |
, j JSONB | |
); | |
COMMENT ON COLUMN ${db_table_name}.ts IS 'timestamp: table insert'; | |
COMMENT ON COLUMN ${db_table_name}.jts IS 'timestamp: from systemd journal / log timestamp'; | |
COMMENT ON COLUMN ${db_table_name}.priority IS 'A priority value between 0 ("emerg") and 7 ("debug")'; | |
COMMENT ON COLUMN ${db_table_name}.j IS 'journald log entry as json. see: http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html '; | |
CREATE INDEX idx_${db_table_name}_ts ON ${db_table_name} (ts); | |
CREATE INDEX idx_${db_table_name}_jts_hostid ON ${db_table_name} (jts, hostid); | |
CREATE OR REPLACE FUNCTION pr_${db_table_name}_set_meta() RETURNS trigger AS | |
\\\$BODY\\\$ | |
DECLARE | |
rts VARCHAR; | |
rts_as_double DOUBLE PRECISION; | |
BEGIN | |
IF NEW.ts IS NULL THEN | |
NEW.ts := NOW(); | |
END IF; | |
IF NEW.jts IS NULL THEN | |
rts := (SELECT NEW.j->>'__REALTIME_TIMESTAMP'); | |
rts_as_double := CAST(FORMAT('%s.%s', left(rts, 10), substr(rts, 11)) AS DOUBLE PRECISION); | |
-- RAISE NOTICE 'realtime timestamp: %, as double: %', rts, rts_as_double; | |
NEW.jts := TO_TIMESTAMP(rts_as_double); | |
END IF; | |
IF NEW.hostname IS NULL THEN | |
NEW.hostname := (SELECT NEW.j->>'_HOSTNAME'); | |
END IF; | |
IF NEW.hostid IS NULL THEN | |
NEW.hostid := (SELECT NEW.j->>'_MACHINE_ID'); | |
END IF; | |
IF NEW.priority IS NULL THEN | |
NEW.priority := (SELECT NEW.j->>'PRIORITY'); | |
END IF; | |
RETURN NEW; | |
END; | |
\\\$BODY\\\$ LANGUAGE plpgsql; | |
CREATE TRIGGER tr_bi_${db_table_name}_set_meta | |
BEFORE INSERT ON $db_table_name | |
FOR EACH ROW EXECUTE PROCEDURE pr_${db_table_name}_set_meta(); | |
EOF | |
psql($setup_sql); | |
} | |
sub update_db { | |
# query the last journal cursor from the db | |
my $last_cursor_query = <<"QUERY"; | |
SELECT j->>'__CURSOR' | |
FROM $db_table_name | |
WHERE jts = (SELECT MAX(jts) | |
FROM $db_table_name | |
WHERE hostid = '$machine_id') | |
AND hostid = '$machine_id'; | |
QUERY | |
my $last_cursor = psql($last_cursor_query); | |
my $entries_counter = 0; | |
# start / configure psql session | |
open my $pgfh, "| cat | $psql_cmd "; | |
say $pgfh "BEGIN;"; | |
# build / execute journalctl cmd | |
my $jstart = ($last_cursor ? " --after-cursor='$last_cursor' " : ""); | |
my $journalctl_cmd = "journalctl -o json $jstart $journalctl_args"; | |
say "EXEC: $journalctl_cmd"; | |
open my $jfh, "$journalctl_cmd |" ; | |
while(my $line = <$jfh>){ | |
chomp($line); | |
next if $line eq "-- Reboot --"; # nice job systemd! | |
my $sql = "INSERT INTO $db_table_name (j) VALUES (to_json(\$JOURNAL2PG\$$line\$JOURNAL2PG\$::jsonb)::jsonb);"; | |
say $pgfh $sql; | |
$entries_counter += 1; | |
} | |
close($jfh); | |
# commit / close psql session | |
say $pgfh "COMMIT;"; | |
close($pgfh); | |
say "done - $entries_counter new entries added"; | |
} | |
# | |
# | |
# | |
sub verify_db_encoding { | |
my $db_encoding = psql("SHOW SERVER_ENCODING;"); | |
if($db_encoding ne "UTF8"){ | |
&abort("unsupported server encoding", " | |
expected: UTF8, actual: $db_encoding | |
HINT: | |
create a database with UTF8 encoding: | |
'pg_createcluster 9.4 <DB_NAME> --start-conf=auto --start -- -E UTF8 --data-checksums' | |
"); | |
} | |
} | |
# | |
# | |
# | |
sub verify_db_login { | |
my $psql_out = `$psql_cmd -c "\\q" 2>&1`; | |
my $psql_exit_code = $?; | |
if($psql_exit_code != 0){ | |
&abort("database login failed", " | |
PSQL Message: | |
$psql_out | |
"); | |
} | |
} | |
# | |
# | |
# | |
sub verify_db_is_initialized{ | |
if(psql("SELECT 'OK' FROM pg_class WHERE relname = '$db_table_name';") ne "OK"){ | |
&abort("database not initzalized", " | |
HINT: | |
initialize the database with: | |
'$0 --init-db' | |
"); | |
} | |
} | |
# ============================================================================= | |
# helper functions | |
# ============================================================================= | |
# | |
# | |
# | |
sub abort { | |
my $reason = shift; | |
my $details = shift; | |
say "\n! ABORT ! - $reason\n$details"; | |
exit 1; | |
} | |
# | |
# | |
# | |
sub psql { | |
my $sql = shift; | |
my $cmd = <<"PERL_EOF"; | |
$psql_cmd <<EOF | |
\\t | |
$sql; | |
EOF | |
PERL_EOF | |
# execute psql and catch output | |
my $res = `$cmd`; | |
# trim spaces | |
$res =~ s/^\s+|\s+$//g; | |
return $res; | |
} | |
# | |
# | |
# | |
sub build_psql_cmd { | |
my $remote_login_args = ""; | |
if(defined($db_ip)){ | |
$remote_login_args .= " -h $db_ip "; | |
} | |
if(defined($db_port)){ | |
$remote_login_args .= " -p $db_port "; | |
} | |
return "psql -q $remote_login_args -U $db_user -d $db_name"; | |
} | |
# | |
# | |
# | |
sub current_os_user { | |
my $user = getpwuid($<); | |
return $user; | |
} | |
# | |
# | |
# | |
sub read_file { | |
our $path = shift; | |
# local sub function to keep | |
# '$/' change local (hint: chomp) | |
sub slurp { | |
open my $fh, "<$path"; | |
local $/ = undef; | |
my $content = <$fh>; | |
close($fh); | |
return $content; | |
} | |
chomp(my $content = &slurp()); | |
return $content; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment