Last active
October 19, 2023 02:38
-
-
Save s1037989/1c8d219d74fb7a9bb93c72bbc97edb02 to your computer and use it in GitHub Desktop.
diode
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# perl diode recv ring | |
# perl diode send cc00 | |
# perl diode send ccff 0003 610062 | |
# perl diode send ff01 0 0003 610062 | |
# perl diode sendfile file [name] | |
use 5.010; | |
use strict; | |
use warnings; | |
use Errno; | |
use Digest::MD5; | |
use Fcntl qw(:DEFAULT :mode :seek); | |
use Time::HiRes qw(nanosleep time); | |
use IO::Socket::Packet; # TODO: how to set this up without an object? | |
use Socket qw(SOCK_RAW); | |
use Socket::Packet qw( | |
PACKET_OUTGOING | |
PF_PACKET AF_PACKET | |
ETH_P_ALL | |
pack_sockaddr_ll unpack_sockaddr_ll recv_len | |
); | |
use constant DEBUG => $ENV{DIODE_DEBUG} // 0; | |
use constant DIODE => $ENV{DIODE_IFNAME} || 'diode'; # TODO: test between machines; TODO: how to work use a virtual interface "pair"? | |
use constant { | |
PROTOCOL => ETH_P_ALL, # TODO: how to use a custom protocol? | |
ETH => 0x0000, | |
}; | |
use constant { | |
FRAME_SIZE => 2048, | |
FRAME_NR => 256000, | |
KERNEL_BLOCK_SIZE => 16384, | |
}; | |
use constant { | |
NS_RECV => 1, | |
NS_SEND => 1, # TODO: this should be calculated based on the speed of the link and how well the receiver can keep up (10 is not that much faster than 100) | |
# rather than just a constant pause between packets; perhaps also a larger pause between current bytes written or evenly every n seconds? | |
}; | |
use constant VALID_CMDS => { | |
"00" => [], # ping | |
"01" => [], # stop | |
"10" => [ # start sending file | |
"[Aa]{1,4}", | |
], | |
"11" => [ # stop sending file | |
"[Aa]{1,4}", | |
], | |
"ff" => [ # custom command | |
"a (abc|[ab])", | |
"Z (ZYX|[ZX])", | |
], | |
}; | |
my $queue = {}; | |
my $subcommand = \&{'cmd_'.shift @ARGV}; | |
# pre-set dst, src, eth, and id; take op, len, and payload from args | |
@ARGV = (('00'x6), ('00'x6), ('00'x2), $ARGV[0], (length($ARGV[1])?sprintf('%032s', $ARGV[1]):''), map { $_||'' } @ARGV[2,3]) if $subcommand eq \&cmd_send; | |
$subcommand->(@ARGV); | |
### subcommands | |
sub cmd_recv { | |
my $recv_type = shift // 'normal'; | |
my $sock = $recv_type eq 'ring' ? &_sockring : &_sock; | |
while (1) { | |
nanosleep NS_RECV; | |
my $packet; | |
my $rv = _sockread($sock, \$packet, \my %info); | |
next if !defined($rv) && $!{EAGAIN}; | |
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $packet; | |
next unless $dst eq pack 'H*', sprintf '%012x', 0; # TODO: use a custom dst | |
next unless $src eq pack 'H*', sprintf '%012x', 0; # TODO: use a custom src | |
# next unless $eth eq pack 'H*', sprintf '%04x', ETH; # TODO: how to use a custom eth? | |
next unless $op1 eq "\xff" || $op1 eq "\xcc"; # valid op1 is cc (command) or ff (file) | |
$payload_len = unpack "S", pack "S", hex unpack 'H*', $payload_len; | |
if ($payload_len != length $payload) { | |
warn sprintf "%s: %s\n", 'length mismatch', join ' ', map { unpack 'H*', $_ } unpack 'a6a6a2a*', $packet; | |
$sock->done_ring_frame if $sock->can('done_ring_frame'); | |
next; | |
} | |
$id = unpack "H*", $id; | |
if ($op1 eq "\xff") { | |
my $addl_data = $op2; | |
_file($id, $addl_data, $payload); | |
} | |
elsif ($op1 eq "\xcc") { | |
$op1 = unpack 'H*', $op1; | |
$op2 = unpack 'H*', $op2; | |
my $command = \&{"_command_$op2"}; | |
if (_is_valid_cmd($op1.$op2, $payload)) { | |
$command->($id, $payload); | |
} | |
else { | |
warn "invalid command $payload\n"; | |
} | |
} | |
$sock->done_ring_frame if $sock->can('done_ring_frame'); | |
} | |
} | |
sub cmd_sendfile { # TODO: allow sending multiple files from the cmdline, but they should be done in serial, not parallel | |
my $subcall = ref $_[0] ? 1 : 0; | |
my $sock = ref $_[0] ? shift : &_sock; | |
$_[2] = sprintf '%04x', PROTOCOL; | |
my ($file, $name) = (shift, shift); | |
$name ||= $file; | |
warn "executing file callback script [$file $name]\n"; # TODO: implement file callback script | |
die "file $file does not exist\n" unless -e $file; | |
open my $fh, '<', $file; | |
sysseek($fh, 0, SEEK_SET); | |
my $digest = Digest::MD5->new->addfile($fh)->digest; | |
my $hexdigest = unpack('H*', $digest); | |
my @cmd = (qw(000000000000 000000000000 0000 cc10), $hexdigest, sprintf('%04x', hex length $name), unpack('H*', $name)); # TODO: refactor so that a command like this is just 2 commands | |
my $ret = ''; | |
my @alarm_cmd = (@cmd[0..2], 'cc00', $cmd[4], '0000'); | |
$SIG{ALRM} = sub { alarm 1; cmd_send($sock, @alarm_cmd); } unless $SIG{ALRM}; | |
alarm 1; | |
my $read_bytes = my $write_bytes = 0; | |
my $start_time = time; | |
cmd_send($sock, @cmd); | |
sysseek($fh, 0, SEEK_SET); | |
while ($ret = sysread($fh, my $read_buffer, 131072, 0)) { | |
$read_bytes += $ret; | |
my @read_buffer = unpack '(a1400)*', $read_buffer; | |
undef $read_buffer; | |
while (my $write_buffer = shift @read_buffer) { | |
my $addl_data = $read_bytes < -s $file || scalar @read_buffer ? "\x01" : "\x00"; | |
$write_buffer = ("\x00"x6).("\x00"x6).("\x00"x2).("\xff".$addl_data).$digest.pack('H*', sprintf('%04x', length $write_buffer)).$write_buffer; | |
nanosleep NS_SEND; | |
my $rv = syswrite($sock, $write_buffer, length $write_buffer); # TODO: need a timeout here | |
$write_bytes += $rv; | |
if (!defined($rv) && $!{EAGAIN}) { | |
say "EAGAIN"; | |
} elsif ($rv != length $write_buffer) { | |
say "incomplete write"; | |
} else { | |
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $write_buffer; | |
$queue->{$hexdigest} += length $payload; | |
say join ' ', (map { unpack 'H*', $_ } $dst, $src, $eth, $op1.$op2, $id), _trunc(unpack('H*', $payload), 8) if DEBUG > 0; | |
} | |
} | |
} | |
say "total: $queue->{$hexdigest}" if $queue->{$hexdigest}; | |
say time - $start_time unless $subcall; | |
} | |
sub cmd_send { | |
my $subcall = ref $_[0] ? 1 : 0; | |
my $sock = ref $_[0] ? shift : &_sock; | |
# Command Validation: | |
my $is_valid_cmd = _is_valid_cmd($_[3], pack 'H*', ($_[6]//'')); | |
@_[5,6] = ('')x2 if $is_valid_cmd == -1; # remove args from cmds that don't accept args | |
die sprintf "invalid command %s\n", join ' ', split /\x00/, pack 'H*', ($_[6]//'') if !$is_valid_cmd; | |
$_[2] = sprintf '%04x', PROTOCOL; | |
my $loop = 1; | |
$loop = $_[5] // $loop if $_[3] eq 'ff01'; | |
@_[5,6] = ('0578', "61" x 1400) if $_[3] eq 'ff01'; | |
my $c = 1; | |
my @alarm_cmd = (@_[0..2], 'cc00', $_[4], '0000'); | |
$SIG{ALRM} = sub { alarm 1; cmd_send($sock, @alarm_cmd); } unless $SIG{ALRM}; | |
alarm 1; | |
for (1..$loop) { | |
nanosleep NS_SEND; | |
$_[3] = 'ff00' if $_[3] eq 'ff01' && $c == $loop; | |
my $buffer = pack 'H*', join '', @_; | |
my $rv = syswrite($sock, $buffer, length $buffer); # TODO: need a timeout here | |
if (!defined($rv) && $!{EAGAIN}) { | |
say "EAGAIN"; | |
} elsif ($rv != length $buffer) { | |
say "incomplete write"; | |
} else { | |
my ($dst, $src, $eth, $op1, $op2, $id, $payload_len, $payload) = unpack 'a6a6a2a1a1a16a2a*', $buffer; | |
$queue->{$_[4]} += length $payload if $op1 eq "\xff"; | |
say join ' ', (map { unpack 'H*', $_ } $dst, $src, $eth, $op1.$op2, $id), _trunc(unpack('H*', $payload), 8) if DEBUG > 0; | |
} | |
} continue { | |
$c++; | |
} | |
say "total: $queue->{$_[4]}" if $_[4] && $queue->{$_[4]}; | |
} | |
# private subs | |
sub _is_valid_cmd { | |
my ($op1, $op2) = unpack 'a2a2', $_[0]; | |
return 0 unless $op1 eq 'cc'; | |
return 0 unless exists VALID_CMDS->{$op2}; | |
my $valid_cmd = VALID_CMDS->{$op2}; | |
return -1 unless scalar @$valid_cmd; | |
$_[1] =~ s/\x00/ /g; | |
grep { $_[1] =~ $_ } map { s/\s+/ /g; qr(^$_$) } @$valid_cmd; | |
} | |
sub _sock { | |
my $ifname = DIODE; | |
open IFINDEX, "/sys/class/net/$ifname/ifindex"; | |
my $ifindex = <IFINDEX>; | |
close IFINDEX; | |
socket(my $sock, AF_PACKET, SOCK_RAW, 0) or die "Cannot socket() - $!\n"; | |
bind($sock, pack_sockaddr_ll(PROTOCOL, $ifindex, 0, 0, "")) or die "Cannot bind() - $!\n"; | |
my $flags = fcntl($sock, F_GETFL, 0) or die "Couldn't get flags for \$sock : $!\n"; | |
$flags |= O_NONBLOCK; | |
fcntl($sock, F_SETFL, $flags) or die "Couldn't set flags for \$sock: $!\n"; | |
return $sock; | |
} | |
sub _sockread { | |
my ($sock, $packet, $info) = @_; | |
if ($sock->can('wait_ring_frame')) { | |
$sock->wait_ring_frame($$packet, $info); | |
} | |
else { | |
sysread($sock, $$packet, 1500); | |
} | |
} | |
sub _sockring { | |
my $ifname = DIODE; | |
my $sock = IO::Socket::Packet->new(Type => SOCK_RAW, IfName => $ifname, Protocol => PROTOCOL) | |
or die "Cannot create PF_PACKET socket - $!"; | |
unless( eval { $sock->setup_rx_ring(FRAME_SIZE, FRAME_NR, KERNEL_BLOCK_SIZE) } ) { | |
die "Cannot setup PACKET_RX_RING - $@" if $@; | |
die "Cannot setup PACKET_RX_RING - $!"; | |
} | |
$sock->blocking(0); | |
return $sock; | |
} | |
sub _trunc { | |
my ($string, $keep_length) = @_; | |
$keep_length //= 4; | |
my $length = length($string); | |
# If the string is shorter or equal to 8 characters, no truncation is needed | |
return $string if $length <= $keep_length * 2; | |
# Calculate the length of the parts to keep | |
my $keep_start = substr($string, 0, $keep_length); | |
my $keep_end = substr($string, -$keep_length); | |
# Construct the truncated string | |
my $truncated = $keep_start . "..." . $keep_end . " (${\($length/2)} bytes)"; | |
return $truncated; | |
} | |
# files and commands | |
sub _command_00 { say 'ping' } | |
sub _command_01 { say 'stop' and exit } | |
sub _command_10 { | |
my ($id, $payload) = @_; | |
my $file = $payload; | |
$queue->{$id} = {meta => {file => $file}, data => ''}; | |
my $fh = $queue->{$id}->{fh}; | |
close $fh if $fh; | |
delete $queue->{$id}->{fh} if $fh; | |
open $queue->{$id}->{fh}, '>', './'.$file; # TODO: use cmdline argument for outdir; TODO: create a lockfile for writing to the file | |
say "receiving file $file ($id)"; | |
} | |
sub _command_11 { | |
my ($id, $payload) = @_; | |
my $file = $payload; | |
my $fh = $queue->{$id}->{fh}; | |
close $fh if $fh; | |
delete $queue->{$id}->{fh} if $fh; | |
say "closed file $file ($id)"; | |
} | |
sub _command_ff { | |
my ($id, $payload) = @_; | |
my @args = split /\x00/, $payload; | |
warn "executing command callback script [@args]\n"; # TODO: implement command callback script | |
} | |
sub _file { | |
my ($id, $addl_data, $payload) = @_; | |
warn "file $id not initialized\n" and return unless my $fh = $queue->{$id}->{fh}; | |
$queue->{$id}->{meta}->{_start_time} //= time; | |
# uncomment to not write to disk and go faster for testing | |
# return if $addl_data eq "\x01"; | |
# die time - $queue->{$id}->{meta}->{_start_time}; | |
$queue->{$id}->{meta}->{data} .= $payload; | |
my $data = $queue->{$id}->{meta}->{data}; | |
my $file = $queue->{$id}->{meta}->{file}; | |
say sprintf "received %d bytes (%d total) of data to file %s (%s)", length $payload, (length($data) + -s $file) , $file, $id if DEBUG > 1; | |
return unless length $data >= 131072 || $addl_data eq "\x00"; | |
(syswrite($fh, $data) // -1) == length $data or die "failed to write to file $file: $!\n"; | |
$queue->{$id}->{meta}->{data} = ''; | |
say sprintf "added %d bytes (%d total) of data to file %s (%s)", length $data, -s $file, $file, $id if DEBUG > 0; | |
return unless $addl_data eq "\x00"; | |
$queue->{$id}->{meta}->{end_time} = time; | |
close $fh; | |
delete $queue->{$id}->{fh}; | |
open $fh, '<', $file; | |
sysseek($fh, 0, SEEK_SET); | |
my $hexdigest = Digest::MD5->new->addfile($fh)->hexdigest; | |
close $fh; | |
$queue->{$id}->{meta}->{start_time} = delete $queue->{$id}->{meta}->{_start_time}; | |
my $duration = ($queue->{$id}->{meta}->{end_time} - $queue->{$id}->{meta}->{start_time}); | |
if ($hexdigest eq $id) { | |
warn sprintf "closed file $id (%d bytes) in %f seconds (%0.3fMbps)\n", -s $file, $duration, ((8 * -s $file) / $duration / 1_000_000); | |
warn "executing file callback script on $file\n"; # TODO: implement file callback script | |
} | |
else { | |
warn sprintf "closed file $id (%d bytes) in %f seconds (%0.3fMbps) (checksum mismatch)\n", -s $file, $duration, ((8 * -s $file) / $duration / 1_000_000); | |
unlink $file; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment