Last active
December 17, 2015 08:19
-
-
Save vshymanskyy/5579584 to your computer and use it in GitHub Desktop.
Kademlia DHT implementation in Perl + AnyEvent.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
# Kademlia DHT implementation in Perl + AnyEvent. | |
# Copyright © 2013 PE Volodymyr Shymanskyy. All rights reserved. | |
# License: GNU GPL v2 (http://www.gnu.org/licenses/gpl-2.0.txt) | |
our ($DBG, $DUMP); | |
$Data::Dumper::Terse = 1; | |
$Data::Dumper::Indent = 1; | |
package Kad { | |
use Readonly; | |
Readonly::Hash %Config => ( | |
ID_BITS => 160, # Node ID size | |
ID_HASH => 'SHA-1', # Hash function | |
ALPHA => 3, # Maximum number of concurrent messages in transit | |
STALE => 5, # TODO: Number of times a node can be marked as stale before it is actually removed | |
MIN_PORT => 1024, # Port restriction | |
TIMEOUT_REQ => 1.0, # Single request timeout | |
RT_K => 20, # K-Bucket size (K) | |
); | |
} | |
package Kad::Id { | |
use base Math::BigInt; | |
use strict; | |
use warnings; | |
use Digest; | |
use MIME::Base64 qw(encode_base64url decode_base64url); | |
use overload '""' => sub { $_[0]->as_b64() }; | |
sub new { from_b64(@_) } | |
sub from_hex { | |
my ($class, $hex) = @_; | |
my $self = $class->SUPER::from_hex($hex); | |
bless ($self, ref($class) || $class); | |
} | |
sub from_b64 { Kad::Id->from_raw(decode_base64url($_[1])) } | |
sub from_raw { Kad::Id->from_hex(join('', unpack('H*', $_[1]))) } | |
sub from_hash { Kad::Id->from_hex(Digest->new($Kad::Config{ID_HASH})->add($_[1])->hexdigest()) } | |
sub as_raw { pack 'H*', $_[0]->as_hex() } | |
sub as_bin { sprintf "%0$Kad::Config{ID_BITS}s", $_[0]->SUPER::as_bin() =~ s/^0b//r } | |
sub as_hex { sprintf '%0'.($Kad::Config{ID_BITS}/4).'s', $_[0]->SUPER::as_hex() =~ s/^0x//r } | |
sub as_b64 { encode_base64url($_[0]->as_raw()) } | |
sub random { | |
Kad::Id->from_hex(join('', ('0'..'9', 'a'..'f')[ | |
map { rand 16 } (1 .. $Kad::Config{ID_BITS}/4) | |
])) | |
} | |
sub prefixlen { | |
my ($self) = @_; | |
$self->as_bin() =~ /(^0*)/; | |
return length($1); | |
} | |
} | |
package Kad::DataStore::Hash { | |
use strict; | |
use warnings; | |
sub new { | |
my ($class) = @_; | |
my $self = { data => {} }; | |
bless ($self, ref($class) || $class); | |
} | |
sub set { | |
my ($s, $k, $v, $id) = @_; | |
if (defined $v) { | |
if (defined $s->{data}{$k} and $s->{data}{$k}{val} eq $v) { # already exists and same | |
$s->{data}{$k}{val} = $v; | |
$s->{data}{$k}{time} = time; | |
} else { | |
$s->{data}{$k} = { | |
val => $v, | |
time => time, | |
pub_time => time, | |
pub_id => $id | |
}; | |
} | |
} else { delete $s->{data}{$k}; } | |
} | |
sub get { my ($s, $k) = @_; return $s->{data}{$k}; } | |
sub has { my ($s, $k, $v) = @_; return defined $s->{data}{$k}; } | |
sub unset { my ($s, $k) = @_; } | |
sub keys { my ($s) = @_; return keys $s->{data} } | |
} | |
package Kad::Node { | |
use strict; | |
use warnings; | |
use feature qw(switch); | |
use Readonly; | |
use AnyEvent; | |
use JSON::XS; | |
use List::Util qw(min); | |
use List::MoreUtils qw(firstidx); | |
use IO::Socket::INET; | |
use Data::Dumper; | |
# Node state | |
Readonly::Hash our %STATE => ( | |
WAIT => 0, JOINING => 1, JOINED => 2, LEFT => 3 | |
); | |
Readonly::Hash our %RPC => ( | |
INFO => 'info', FIND => 'find', STORE => 'store', | |
); | |
sub new { | |
my ($class, $name, $bind, $port) = @_; | |
my $self = { | |
state => $STATE{WAIT}, | |
lid => Kad::Id->from_hash($name), | |
bind => $bind, | |
port => $port, | |
rt => [], # routing table | |
ds => Kad::DataStore::Hash->new(), | |
reqs => {}, # current requests (tag => callback) | |
}; | |
bless ($self, ref($class) || $class); | |
# create UDP socket | |
$self->{sock} = IO::Socket::INET->new( | |
LocalAddr => "$bind:$port", Proto => 'udp', | |
ReuseAddr => 1, Broadcast => 1, MultiHomed => 1 | |
) or die "Couldn't create socket: $!\n"; | |
# set socket read handler | |
$self->{_ae_recv} = AnyEvent->io( | |
fh => $self->{sock}, poll => 'r', | |
cb => sub { $self->udp_recv() } | |
) or die "Couldn't set event: $!\n"; | |
return $self; | |
} | |
################################ | |
# Routing table | |
################################ | |
sub rt_update { | |
my ($self, $contact) = @_; | |
return if $contact->{id} == $self->{lid}; # don't add myself | |
if ($STATE{WAIT} == $self->{state}) { | |
AnyEvent::postpone { | |
$self->join(sub { | |
my ($c) = @_; | |
print "Joined on incoming, to $contact->{addr}\n" if $c; | |
}, $contact->{addr}); | |
}; | |
} | |
my $bn = ($contact->{id} ^ $self->{lid})->prefixlen(); | |
unless (defined $self->{rt}[$bn]) { $self->{rt}[$bn] = [ $contact ]; return $contact; } | |
my $b = $self->{rt}[$bn]; | |
if ((my $idx = firstidx { $_->{id} == $contact->{id} } @{$b}) >= 0) { # exists | |
unshift $b, splice($b, $idx, 1) if (0 != $idx); # move to top | |
return $b->[$idx]; | |
} elsif (scalar @{$b} > $Kad::Config{RT_K}) { | |
# TODO: replacement cache | |
} else { | |
print "New node: $contact->{id} [$contact->{addr}]\n" if $DBG; | |
unshift $b, $contact; # add to top | |
} | |
return $contact; | |
} | |
sub rt_find_closest { | |
my ($self, $target, $qty, $skip) = @_; | |
my $bs = $self->{rt}; | |
my $bn = ($target ^ $self->{lid})->prefixlen(); | |
my @res = (); | |
push @res, @{$bs->[$bn]} if defined $bs->[$bn]; | |
for (1 .. $Kad::Config{ID_BITS}) { | |
push @res, @{$bs->[$bn-$_]} if $bn-$_ >= 0 and defined $bs->[$bn-$_]; | |
push @res, @{$bs->[$bn+$_]} if defined $bs->[$bn+$_]; | |
last if $#res > $qty or ($bn-$_ < 0 and $bn+$_ >= $Kad::Config{ID_BITS}); | |
} | |
@res = sort { ($a->{id} ^ $target) <=> ($b->{id} ^ $target) } | |
grep { $_->{id} != $skip } @res; | |
@res = @res[0 .. min($qty-1, $#res)]; | |
return @res; | |
} | |
################################ | |
# Helpers | |
################################ | |
sub _endpoint { | |
my ($addr, $port) = @_; | |
if (defined $port and defined $addr) { | |
return sockaddr_in($port, inet_aton($addr)); | |
} elsif (defined $addr) { | |
($port, $addr) = sockaddr_in($addr); | |
$addr = inet_ntoa($addr); #gethostbyaddr($host, AF_INET); | |
return ($addr, $port); | |
} | |
return; | |
} | |
sub on_msg { | |
my ($self, $obj, $addr, $port) = @_; | |
return if $obj->{type} and $obj->{tag} and $self->{reqs}{$obj->{tag}}; # Request from myself? | |
my $cont; $cont = $self->rt_update({ # Update RT on each message | |
id => Kad::Id->new($obj->{id}), | |
addr => "$addr:$port", | |
}) if $obj->{id}; | |
given ($obj->{type}) { | |
when ('stdin') { # Input from keyboard | |
return unless $addr eq "127.0.0.1"; # Small protection ;) | |
given ($obj->{text}) { | |
when (/^exit$/) { exit; } | |
when (/^ping (.*)$/) { | |
print "Ping to $1...\n"; | |
$self->ping($1, sub { print $_[0] ? "OK!\n" : "Fail\n"; }); | |
} when (/^join (.*)$/) { | |
print "Joining to $1...\n"; | |
$self->join(sub { print $_[0] ? "OK!\n" : "Fail ;(\n"; }, $1); | |
} when (/^find (.*)$/) { | |
$self->find(Kad::Id->from_hash($1), sub { | |
print $_[0] ? "Addr: $_[0]->{addr}\n" : "Not found\n"; | |
}); | |
} when (/^set (\S*) (.*)$/) { | |
$self->store(Kad::Id->from_hash($1), $2, sub { | |
print $_[0] ? "Stored to $_[0] nodes\n" : "Failed: Stored only locally\n"; | |
}); | |
} when (/^get (\S*)$/) { | |
$self->retrieve(Kad::Id->from_hash($1), sub { | |
print $_[0] ? "Value: $_[0]{val}\nAge: " . (time - $_[0]{pub_time}) . "s\n" : "Not found\n"; | |
}); | |
} when (/^hash (\S*)$/) { | |
print Kad::Id->from_hash($1); | |
} when (/^dump$/) { | |
print "State: $self->{state}\n"; | |
print Data::Dumper->Dump([$self->{rt}, $self->{ds}{data}], [qw(rt ds)]); | |
}} | |
} when ($RPC{INFO}) { # Kad PING | |
$self->udp_send({ | |
tag => $obj->{tag} | |
}, "$addr:$port"); | |
} when ($RPC{FIND}) { # Kad FIND_NODE and FIND_VALUE (tgt or key) | |
if (defined $obj->{tgt} or (defined $obj->{key} and not $self->{ds}->has($obj->{key}))) { | |
my @res = $self->rt_find_closest( | |
Kad::Id->new($obj->{tgt} or $obj->{key}), | |
$Kad::Config{RT_K}, Kad::Id->new($obj->{id})); | |
my @res2; push @res2, { id => "$_->{id}", addr => $_->{addr} } foreach @res; | |
$self->udp_send({ nodes => \@res2, tag => $obj->{tag} }, "$addr:$port"); | |
} elsif (defined $obj->{key} and $self->{ds}->has($obj->{key})) { | |
$self->udp_send({ value => $self->{ds}->get($obj->{key}), tag => $obj->{tag} }, "$addr:$port"); | |
} | |
} when ($RPC{STORE}) { # Kad STORE | |
if (defined $obj->{key} and defined $obj->{val}) { | |
print "Set $obj->{key} to $obj->{val}\n" if $DBG; | |
$self->{ds}->set($obj->{key}, $obj->{val}, $obj->{id}); | |
$self->udp_send ({ tag => $obj->{tag} }, "$addr:$port"); | |
} | |
} when (undef) { # Reply? | |
if ($obj->{tag} and my $req = delete $self->{reqs}{$obj->{tag}}) { | |
$req->{cbk}->($obj, $addr, $port); | |
} else { | |
print ("Unhandled rsp: [$addr:$port] tag: $obj->{tag}\n") if $DBG; | |
} | |
} default { | |
print "Unhandled req: [$addr:$port] type: $obj->{type}, tag: $obj->{tag}\n"; | |
}} | |
} | |
sub udp_recv { | |
my ($self) = @_; | |
$self->{sock}->recv(my $msg, 4096); | |
return unless $msg; | |
my ($addr, $port) = _endpoint($self->{sock}->peername); | |
return if $port < $Kad::Config{MIN_PORT}; | |
my ($len, $obj) = (length $msg, decode_json $msg); | |
print "<= udp[$addr:$port] ($len) $msg\n" if $DUMP; | |
on_msg($self, $obj, $addr, $port); | |
} | |
sub udp_send { | |
my ($self, $obj, $to, $cbk) = @_; | |
die unless $obj and $to; | |
my ($addr, $port) = split /:/, $to; | |
return if $port < $Kad::Config{MIN_PORT}; | |
$obj->{id} = "$self->{lid}"; # set id | |
# generate tag | |
$obj->{tag} = join('', ('0'..'9', 'a'..'z', 'A'..'Z', '-', '_')[ | |
map { rand 64 } (1 .. 12) # 64-bit number equivalent | |
]) unless exists $obj->{tag}; | |
# register callbacks | |
$self->{reqs}{$obj->{tag}} = { | |
cbk => $cbk, | |
tmr => AnyEvent->timer (after => $Kad::Config{TIMEOUT_REQ}, cb => sub { | |
print "Timeout: $obj->{tag}\n" if $DBG; | |
delete $self->{reqs}{$obj->{tag}}; | |
$cbk->(); | |
}) | |
} if ($cbk); | |
defined $obj->{$_} or delete $obj->{$_} for keys $obj; # cleanup undefs | |
# actually send | |
my $msg = encode_json($obj); | |
my $len = length($msg); | |
print "=> udp[$to] ($len) $msg\n" if $DUMP; | |
send ($self->{sock}, $msg, 0, _endpoint($addr, $port)); | |
} | |
sub find_iter { | |
my ($self, $tgt, $mode, $cbk) = @_; # mode can be qw(one all val) | |
# context of async operation | |
my @res = $self->rt_find_closest($tgt, $Kad::Config{RT_K}); | |
unless (scalar @res) { $cbk->(($mode eq 'val') ? undef : \@res); return } | |
if ($mode eq 'one' and $res[0]->{id} == $tgt) { $cbk->(\@res); return } # found in local table -> finish | |
my $pend = 0; my %sent; my %seen; | |
$seen{$self->{lid}} = 1; # treat myself as seen | |
$seen{$_->{id}} = 1 foreach @res; | |
# helpers | |
my $ask; | |
$ask = sub { | |
print "Res: ", scalar @res, ", Pending: $pend, Seen: ", scalar keys %seen, ", Sent: ", scalar keys %sent, "\n" if $DBG; | |
foreach my $c (grep { !$sent{$_->{id}} } @res) { | |
last if $pend >= $Kad::Config{ALPHA}; | |
$pend++; | |
print "Asking: $c->{id}\n" if $DBG; | |
$sent{$c->{id}} = 1; | |
$self->udp_send({ | |
type => $RPC{FIND}, (($mode eq 'val')?'key':'tgt') => "$tgt" | |
}, $c->{addr}, sub { | |
my ($obj) = @_; | |
return unless $tgt; # operation already finished | |
$pend--; | |
if ($obj and $obj->{nodes}) { | |
my $added = 0; # add non-seen items to list | |
foreach my $c (grep { !$seen{$_->{id}} } @{$obj->{nodes}}) { | |
$c->{id} = Kad::Id->new($c->{id}); | |
$seen{$c->{id}} = 1; $added++; push @res, $c; $self->rt_update($c); | |
} | |
# unless found new nodes -> finish | |
unless ($added) { $tgt = undef; $cbk->(($mode eq 'val')?undef:\@res); return } | |
@res = sort { ($a->{id} ^ $tgt) <=> ($b->{id} ^ $tgt) } @res; | |
@res = @res[0 .. min($Kad::Config{RT_K}-1, $#res)]; | |
if ($DBG) { print "Res distances:\n"; print ' ', ($_->{id} ^ $tgt)->as_hex(), "\n" foreach @res; } | |
if ($mode eq 'one' and $res[0]->{id} == $tgt) { | |
$tgt = undef; $cbk->(\@res); return; # if found required node -> finish | |
} | |
} elsif ($obj and $obj->{value} and $mode eq 'val') { | |
$tgt = undef; $cbk->($obj->{value}); return; # if found required value -> finish | |
} | |
$ask->(); | |
# if no new requests -> finish | |
unless ($pend) { $tgt = undef; $cbk->(($mode eq 'val') ? undef : \@res); return } | |
}); | |
} | |
}; | |
$ask->(); | |
} | |
################################ | |
# API | |
################################ | |
sub ping { | |
my ($self, $addr, $cbk) = @_; | |
$self->udp_send ({ type => $RPC{INFO} }, $addr, $cbk); | |
} | |
sub join { | |
my ($self, $cbk, @addrs) = @_; | |
my $pend = scalar @addrs; | |
my $found = 0; | |
my $prev = $self->{state}; $self->{state} = $STATE{JOINING}; | |
# ping all peers | |
$self->ping($_, sub { | |
my ($obj, $addr, $port) = @_; | |
$found++ if $obj; | |
return if --$pend; | |
if (!$found) { $self->{state} = $prev; $cbk->(); return } # no peers responded ;( | |
# find myself | |
$self->find_iter($self->{lid}, 'all', sub { | |
# TODO: Refresh buckets | |
$self->{state} = $STATE{JOINED}; $cbk->(1); | |
}); | |
}) foreach @addrs; | |
} | |
sub find { | |
my ($self, $id, $cbk) = @_; | |
unless ($STATE{JOINED} == $self->{state}) { $cbk->(); return } | |
$self->find_iter($id, 'one', sub { | |
if (my ($ct) = @_) { | |
if ($ct->[0]->{id} == $id) { | |
$cbk->($ct->[0]); return; | |
} | |
} | |
$cbk->(); | |
}); | |
} | |
sub store { | |
my ($self, $key, $val, $cbk) = @_; | |
unless ($STATE{JOINED} == $self->{state}) { $cbk->(0); return } | |
$self->find_iter($key, 'all', sub { | |
if (my ($ct) = @_) { | |
my ($pend, $done) = (scalar @{$ct}, 0); | |
# TODO: Store to myself if I'm closer than farthest | |
foreach my $c (@{$ct}) { | |
$self->udp_send ({ | |
type => $RPC{STORE}, key => "$key", val => $val | |
}, $c->{addr}, sub { | |
my ($obj) = @_; | |
$done++ if $obj; | |
$cbk->($done) unless --$pend; | |
}); | |
} | |
} else { $cbk->(); } | |
}); | |
} | |
sub retrieve { | |
my ($self, $key, $cbk) = @_; | |
unless ($STATE{JOINED} == $self->{state}) { $cbk->(); return } | |
if ($self->{ds}->has($key)) { | |
$cbk->($self->{ds}->get($key)); | |
} else { | |
$self->find_iter($key, 'val', $cbk); | |
} | |
} | |
} | |
package main; | |
use warnings; | |
use strict; | |
use threads; | |
use AnyEvent; | |
use Getopt::Long; | |
use List::Util qw(shuffle); | |
use IO::Socket::INET; | |
use JSON::XS; | |
sub usage { | |
print <<"USAGE_END"; | |
Example usage: | |
$0 --name=node1 -p 1111 | |
$0 --name=node2 -p 2222 -d 127.0.0.1:1111 | |
$0 --name=node3 -p 3333 -d 127.0.0.1:1111 | |
Note: | |
Port address should not be less than $Kad::Config{MIN_PORT}. | |
Node id is derived from name. | |
USAGE_END | |
exit; | |
} | |
GetOptions( | |
"b|bind=s" => \(my $BIND = '0.0.0.0'), | |
"p|port=s" => \(my $PORT = '1111'), | |
"d|dest=s" => \(my $DEST), | |
"name=s" => \(my $NAME), | |
"dbg" => \$DBG, "dump" => \$DUMP, | |
) or usage(); | |
$| = 1; | |
usage if $PORT < $Kad::Config{MIN_PORT}; | |
usage unless $NAME; | |
my $node; | |
my @helpers; | |
if ($NAME eq 'simul') { | |
$node = Kad::Node->new(Kad::Id->random(), $BIND, $PORT); | |
for my $port ($PORT+1 .. $PORT+50) { | |
my $n = Kad::Node->new(Kad::Id->random(), $BIND, $port); | |
$n->join(sub { print "$port: ", ((shift)? "ok" : "fail") }, "localhost:$PORT"); | |
push @helpers, $n; | |
} | |
} else { | |
$node = Kad::Node->new($NAME, $BIND, $PORT); | |
} | |
print "Kad ID: $node->{lid}\n"; | |
AnyEvent::postpone { | |
$node->join(sub { | |
print ((shift) ? 'Joined to ' : 'Could not join to ', $DEST, "\n"); | |
}, $DEST); | |
} if $DEST; | |
async { | |
my $in_sock = IO::Socket::INET->new( | |
LocalAddr => 'localhost', PeerAddr => "localhost:$PORT", Proto => 'udp' | |
) or die "Couldn't create socket: $!\n"; | |
while(1) { | |
print "${NAME}> "; my $cmd = <STDIN>; | |
last unless defined $cmd; | |
chomp $cmd; next unless $cmd; | |
$in_sock->send(encode_json({ type => 'stdin', text => $cmd })); | |
} | |
}->detach(); | |
AnyEvent->condvar->recv; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment