Last active
August 29, 2015 14:03
-
-
Save kazeburo/4450032714d98f6a1c06 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
package OneSteam::Agent::Collector; | |
use strict; | |
use warnings; | |
my @func = qw/memory loadavg uptime sys_version processors cpu_usage tcp_established disk_usage disk_io/; | |
sub new { | |
bless {}, shift; | |
} | |
sub stats { | |
my $self = shift; | |
return $self->{stats}; | |
} | |
sub meta { | |
my $self = shift; | |
return $self->{meta}; | |
} | |
sub collect { | |
my $self = shift; | |
$self->{stats} = {}; | |
$self->{meta} = {}; | |
my %warn; | |
my %result; | |
for my $func (@func){ | |
eval { | |
$self->$func(); | |
}; | |
$warn{$func} = $@ if $@; | |
} | |
my $body; | |
$body .= "stats.$_\t".$self->{stats}->{$_}."\n" for sort keys %{$self->{stats}}; | |
$body .= "meta.$_\t".$self->{meta}->{$_}."\n" for sort keys %{$self->{meta}}; | |
$body .= "warn.$_\t".$warn{$_}."\n" for sort keys %warn; | |
$body; | |
} | |
# Convert string like a "123 KB" into as byte | |
sub to_byte { | |
my $s = shift; | |
my $b = 0; | |
($s) = ($s =~ /^\s*(.+?)\s*$/); # trim | |
if ($s =~ /^[0-9]+$/) { | |
$b = $s; | |
} elsif ($s =~ /^([0-9]+)\s*([a-zA-Z]+)$/) { | |
$b = $1; | |
my $u = lc $2; | |
if ($u eq 'kb') { | |
$b = $b * 1024; | |
} elsif ($u eq 'mb') { | |
$b = $b * 1024 * 1024; | |
} elsif ($u eq 'gb') { | |
$b = $b * 1024 * 1024 * 1024; | |
} elsif ($u eq 'tb') { | |
$b = $b * 1024 * 1024 * 1024 * 1024; | |
} else { | |
warnf("Unknown unit: %s", $u); | |
} | |
} else { | |
warnf("Failed to convert into as byte: %s", $s); | |
} | |
return $b; | |
} | |
sub memory { | |
my $self = shift; | |
my %MEMORY_ITEM = ( | |
'MemTotal' => 'memory-total.gauge', | |
'MemFree' => 'memory-free.gauge', | |
'Buffers' => 'memory-buffers.gauge', | |
'Cached' => 'memory-cached.gauge', | |
'SwapTotal' => 'memory-swap-total.gauge', | |
'SwapFree' => 'memory-swap-free.gauge', | |
'Inactive' => 'memory-inactive.gauge', | |
); | |
open my $fh, '<:utf8', '/proc/meminfo' or die "$!\n"; | |
my %meminfo; | |
while (<$fh>) { | |
chomp;chomp; | |
my($key, $val) = split /[\s:]+/, $_, 2; | |
next unless $key; | |
$meminfo{$key} = to_byte($val); | |
} | |
close $fh; | |
my $stats = $self->stats; | |
for my $k ( keys %MEMORY_ITEM ) { | |
$stats->{$MEMORY_ITEM{$k}} = int( defined $meminfo{$k} ? $meminfo{$k} : 0); | |
} | |
$stats->{'memory-used.gauge'} = | |
$stats->{'memory-total.gauge'} - $stats->{'memory-free.gauge'} - $stats->{'memory-inactive.gauge'}; | |
$stats->{'memory-swap-used.gauge'} = $stats->{'memory-swap-total.gauge'} - $stats->{'memory-swap-free.gauge'}; | |
} | |
sub loadavg { | |
my $self = shift; | |
open my $fh, '<', '/proc/loadavg' or die "$!\n"; | |
while (<$fh>) { | |
if (my @e = split /\s+/) { | |
$self->stats->{'loadavg-1.gauge'} = $e[0]; | |
$self->stats->{'loadavg-5.gauge'} = $e[1]; | |
$self->stats->{'loadavg-15.gauge'} = $e[2]; | |
last; | |
} | |
} | |
close $fh; | |
} | |
sub uptime { | |
my $self = shift; | |
open my $fh, '<', '/proc/uptime' or die "$!\n"; | |
while (<$fh>) { | |
if (my @e = split /\s+/) { | |
$self->meta->{'uptime'} = int($e[0]); | |
last; | |
} | |
} | |
close $fh; | |
} | |
sub sys_version { | |
my $self = shift; | |
open my $fh, '<', '/proc/version' or die "$!\n"; | |
$self->meta->{'version'} = <$fh>; | |
chomp $self->meta->{'version'}; | |
close $fh; | |
} | |
sub processors { | |
my $self = shift; | |
open my $fh, '<', '/proc/cpuinfo' or die "$!\n"; | |
while (<$fh>) { | |
$self->stats->{'processors.gauge'}++ if m!^processor\s*:! | |
} | |
close $fh; | |
} | |
sub cpu_usage { | |
my $self = shift; | |
open my $fh, '<', '/proc/stat' or die "$!\n"; | |
my @keys = qw(cpu-user cpu-nice cpu-system cpu-idle cpu-iowait cpu-irq cpu-softirq cpu-steal cpu-guest cpu-guest-nice); | |
while (<$fh>) { | |
if (/^cpu\s+/) { | |
chomp; | |
my(undef, @t) = split /\s+/; | |
for my $k (@keys) { | |
my $v = shift @t; | |
$self->stats->{"$k.derive"} = int(defined $v ? $v : 0); | |
} | |
last; | |
} | |
} | |
close $fh; | |
} | |
sub tcp_established { | |
my $self = shift; | |
open my $fh, '<', '/proc/net/snmp' or die "$!\n"; | |
my $index; | |
while (<$fh>) { | |
if (/^Tcp:/) { | |
my @vals = split /\s+/, $_; | |
if (!$index) { | |
for my $label (@vals) { | |
last if $label eq 'CurrEstab'; | |
$index++; | |
} | |
} | |
else { | |
$self->stats->{'tcp-established.gauge'} = $vals[$index]; | |
last; | |
} | |
} | |
} | |
} | |
sub cap_cmd { | |
my ($cmdref) = @_; | |
pipe my $logrh, my $logwh | |
or die "Died: failed to create pipe:$!\n"; | |
my $pid = fork; | |
if ( ! defined $pid ) { | |
die "Died: fork failed: $!\n"; | |
} | |
elsif ( $pid == 0 ) { | |
#child | |
close $logrh; | |
open STDOUT, '>&', $logwh | |
or die "Died: failed to redirect STDOUT\n"; | |
close $logwh; | |
exec @$cmdref; | |
die "Died: exec failed: $!\n"; | |
} | |
close $logwh; | |
my $result; | |
while(<$logrh>){ | |
$result .= $_; | |
} | |
close $logrh; | |
while (wait == -1) {} | |
my $exit_code = $?; | |
$exit_code = $exit_code >> 8; | |
return ($result, $exit_code); | |
} | |
sub disk_usage { | |
my $self = shift; | |
open my $fh, '<', '/proc/mounts' or die "$!\n"; | |
my @mount_points; | |
my %mount_points; | |
while (<$fh>) { | |
if ( m!^/dev/(.+?) (/.*?) ! ) { | |
next if $2 eq '/boot'; # not required | |
push @mount_points, $2; | |
$mount_points{$2} = $1; | |
$mount_points{$2} =~ s![^A-Za-z0-9_-]!_!g; | |
} | |
} | |
my ($result, $exit_code) = cap_cmd(['df',@mount_points]); | |
die "failed to exec df\n" if $exit_code != 0; | |
my $ret; | |
for ( split /\n/, $result ) { | |
chomp;chomp; | |
my @d = split /\s+/, $_; | |
next unless exists $mount_points{$d[5]}; | |
$self->stats->{"disk-usage-".$mount_points{$d[5]}."-used.gauge"} = $d[2]; | |
$self->stats->{"disk-usage-".$mount_points{$d[5]}."-available.gauge"} = $d[3]; | |
$self->meta->{"disk-usage-".$mount_points{$d[5]}."-mount"} = $d[5]; | |
} | |
return $ret; | |
} | |
sub translate_device_mapper { | |
my $device = shift;; | |
for my $d ( glob(q!/dev/mapper/*!) ) { | |
my $s = readlink($d); | |
next unless $s; | |
($s) = ( $s =~ m!(dm-.+)$! ); | |
if ( $s eq $device ) { | |
$d =~ s!^/dev/!!; | |
return $d; | |
} | |
} | |
die "cannot resolv $device\n"; | |
} | |
sub disk_io { | |
my $self = shift; | |
my @stats = glob '/sys/block/*/stat'; | |
for my $stat ( @stats ) { | |
my ($device) = ( $stat =~ m!^/sys/block/(.+)/stat$! ); | |
open my $fh, '<', $stat or die "$!\n"; | |
my $dstat = <$fh>; | |
close $fh; | |
$dstat =~ s!^\s+!!g; | |
my @dstats = split /\s+/, $dstat; | |
if ( $device =~ m!^dm-! ) { | |
$device = translate_device_mapper($device); | |
} | |
# readd-ios read-merges read-sectors readd-ticks 0..3 | |
# write-ios write-merges write-sectors write-ticks 4..7 | |
# ios-in-prog tot-ticks rq-ticks 8..9 | |
next if $dstats[0] == 0 && $dstats[4] == 0; | |
$device =~ s![^A-Za-z0-9_-]!_!g; | |
$self->stats->{"disk-io-".$device."-read-ios.derive"} = $dstats[0]; | |
$self->stats->{"disk-io-".$device."-read-sectors.derive"} = $dstats[2]; | |
$self->stats->{"disk-io-".$device."-write-ios.derive"} = $dstats[4]; | |
$self->stats->{"disk-io-".$device."-write-sectors.device"} = $dstats[6]; | |
} | |
} | |
sub traffic { | |
my $self = shift; | |
open my $fh, '<', '/proc/net/dev' or die "$!\n"; | |
while (<$fh>) { | |
if ( m!^\s+([^:]+):\s*(.*)$`! ) { | |
my $interface = $1; | |
my $stat = $2; | |
next if $interface eq 'lo'; #skip loopback | |
$interface =~ s![^A-Za-z0-9_-]!_!g; | |
my @stats = split /\s+/, $stat; | |
$self->stats->{"traffic-${interface}-rxbytes.derive"} = $stats[0]; | |
$self->stats->{"traffic-${interface}-txbytes.derive"} = $stats[8]; | |
} | |
} | |
} | |
1; | |
package OneSteam::Agent::Server; | |
use strict; | |
use warnings; | |
use POSIX qw(EINTR EAGAIN EWOULDBLOCK :sys_wait_h); | |
use IO::Socket qw(:crlf IPPROTO_TCP TCP_NODELAY); | |
use IO::Socket::INET; | |
our $MAX_REQUEST_SIZE = 131072; | |
sub new { | |
my $class = shift; | |
bless { | |
timeout => 30, | |
collector => OneSteam::Agent::Collector->new, | |
}, $class; | |
} | |
sub run { | |
my $self = shift; | |
my $localaddr = '0:20078'; | |
my $sock = IO::Socket::INET->new( | |
Listen => SOMAXCONN, | |
LocalAddr => $localaddr, | |
Proto => 'tcp', | |
(($^O eq 'MSWin32') ? () : (ReuseAddr => 1)), | |
) or die "failed to listen to port $localaddr: $!"; | |
$self->server($sock); | |
} | |
sub server { | |
my $self = shift; | |
my $sock = shift; | |
local $SIG{CHLD} = sub { | |
1 until (-1 == waitpid(-1, WNOHANG)); | |
}; | |
while(1) { | |
local $SIG{PIPE} = 'IGNORE'; | |
if ( my $conn = $sock->accept ) { | |
$conn->blocking(0) | |
or die "failed to set socket to nonblocking mode:$!"; | |
$conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) | |
or die "setsockopt(TCP_NODELAY) failed:$!"; | |
my $pid = fork(); | |
die "cannot fork: $!" unless defined $pid; | |
if ( $pid == 0 ) { | |
local $SIG{CHLD} = 'DEFAULT'; | |
$self->handle_connection($conn); | |
$conn->close; | |
exit; | |
} | |
$conn->close; | |
} | |
} | |
} | |
sub handle_connection { | |
my ($self, $conn) = @_; | |
my $buf = ''; | |
my $req = +{}; | |
while (1) { | |
my $rlen = read_timeout( | |
$conn, \$buf, $MAX_REQUEST_SIZE - length($buf), length($buf), $self->{timeout}, | |
) or last; | |
if ( parse_read_buffer($buf, $req ) ) { | |
$buf = ''; | |
my $body = $self->{collector}->collect; | |
my @headers = ('HTTP/1.1 200 OK','Content-Type: text/plain','Connection: close'); | |
push @headers, 'Date: '.http_date(); | |
write_all( $conn, join($CRLF, @headers, '') . $CRLF . $body, $self->{timeout} ); | |
} | |
last; | |
} | |
return; | |
} | |
sub http_date { | |
my @weekday_abbr = qw(Sun Mon Tue Wed Thu Fri Sat); | |
my @month_abbr = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec); | |
my @gmt = gmtime; | |
sprintf '%s, %02d %s %04d %02d:%02d:%02d GMT', $weekday_abbr[$gmt[6]], $gmt[3], $month_abbr[$gmt[4]], | |
$gmt[5]+1900, $gmt[2],$gmt[1],$gmt[0]; | |
} | |
sub parse_read_buffer { | |
my ($buf, $ret) = @_; | |
if ( $buf =~ /$CR?$LF$CR?$LF/ ) { | |
return 1; | |
} | |
return; | |
} | |
# returns (positive) number of bytes read, or undef if the socket is to be closed | |
sub read_timeout { | |
my ($sock, $buf, $len, $off, $timeout) = @_; | |
do_io(undef, $sock, $buf, $len, $off, $timeout); | |
} | |
# returns (positive) number of bytes written, or undef if the socket is to be closed | |
sub write_timeout { | |
my ($sock, $buf, $len, $off, $timeout) = @_; | |
do_io(1, $sock, $buf, $len, $off, $timeout); | |
} | |
# writes all data in buf and returns number of bytes written or undef if failed | |
sub write_all { | |
my ($sock, $buf, $timeout) = @_; | |
my $off = 0; | |
while (my $len = length($buf) - $off) { | |
my $ret = write_timeout($sock, $buf, $len, $off, $timeout) | |
or return; | |
$off += $ret; | |
} | |
return length $buf; | |
} | |
# returns value returned by $cb, or undef on timeout or network error | |
sub do_io { | |
my ($is_write, $sock, $buf, $len, $off, $timeout) = @_; | |
my $ret; | |
DO_READWRITE: | |
# try to do the IO | |
if ($is_write) { | |
$ret = syswrite $sock, $buf, $len, $off | |
and return $ret; | |
} else { | |
$ret = sysread $sock, $$buf, $len, $off | |
and return $ret; | |
} | |
unless ((! defined($ret) | |
&& ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) { | |
return; | |
} | |
# wait for data | |
DO_SELECT: | |
while (1) { | |
my ($rfd, $wfd); | |
my $efd = ''; | |
vec($efd, fileno($sock), 1) = 1; | |
if ($is_write) { | |
($rfd, $wfd) = ('', $efd); | |
} else { | |
($rfd, $wfd) = ($efd, ''); | |
} | |
my $start_at = time; | |
my $nfound = select($rfd, $wfd, $efd, $timeout); | |
$timeout -= (time - $start_at); | |
last if $nfound; | |
return if $timeout <= 0; | |
} | |
goto DO_READWRITE; | |
} | |
1; | |
package main; | |
OneSteam::Agent::Server->new->run; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment