Skip to content

Instantly share code, notes, and snippets.

@vshymanskyy
Last active December 17, 2015 08:19
Show Gist options
  • Save vshymanskyy/5579584 to your computer and use it in GitHub Desktop.
Save vshymanskyy/5579584 to your computer and use it in GitHub Desktop.
Kademlia DHT implementation in Perl + AnyEvent.
#!/usr/bin/perl
# Kademlia DHT implementation in Perl + AnyEvent.
# Copyright © 2013 PE Volodymyr Shymanskyy. All rights reserved.
# License: GNU GPL v2 (http://www.gnu.org/licenses/gpl-2.0.txt)
our ($DBG, $DUMP);
$Data::Dumper::Terse = 1;
$Data::Dumper::Indent = 1;
package Kad {
use Readonly;
Readonly::Hash %Config => (
ID_BITS => 160, # Node ID size
ID_HASH => 'SHA-1', # Hash function
ALPHA => 3, # Maximum number of concurrent messages in transit
STALE => 5, # TODO: Number of times a node can be marked as stale before it is actually removed
MIN_PORT => 1024, # Port restriction
TIMEOUT_REQ => 1.0, # Single request timeout
RT_K => 20, # K-Bucket size (K)
);
}
package Kad::Id {
use base Math::BigInt;
use strict;
use warnings;
use Digest;
use MIME::Base64 qw(encode_base64url decode_base64url);
use overload '""' => sub { $_[0]->as_b64() };
sub new { from_b64(@_) }
sub from_hex {
my ($class, $hex) = @_;
my $self = $class->SUPER::from_hex($hex);
bless ($self, ref($class) || $class);
}
sub from_b64 { Kad::Id->from_raw(decode_base64url($_[1])) }
sub from_raw { Kad::Id->from_hex(join('', unpack('H*', $_[1]))) }
sub from_hash { Kad::Id->from_hex(Digest->new($Kad::Config{ID_HASH})->add($_[1])->hexdigest()) }
sub as_raw { pack 'H*', $_[0]->as_hex() }
sub as_bin { sprintf "%0$Kad::Config{ID_BITS}s", $_[0]->SUPER::as_bin() =~ s/^0b//r }
sub as_hex { sprintf '%0'.($Kad::Config{ID_BITS}/4).'s', $_[0]->SUPER::as_hex() =~ s/^0x//r }
sub as_b64 { encode_base64url($_[0]->as_raw()) }
sub random {
Kad::Id->from_hex(join('', ('0'..'9', 'a'..'f')[
map { rand 16 } (1 .. $Kad::Config{ID_BITS}/4)
]))
}
sub prefixlen {
my ($self) = @_;
$self->as_bin() =~ /(^0*)/;
return length($1);
}
}
package Kad::DataStore::Hash {
use strict;
use warnings;
sub new {
my ($class) = @_;
my $self = { data => {} };
bless ($self, ref($class) || $class);
}
sub set {
my ($s, $k, $v, $id) = @_;
if (defined $v) {
if (defined $s->{data}{$k} and $s->{data}{$k}{val} eq $v) { # already exists and same
$s->{data}{$k}{val} = $v;
$s->{data}{$k}{time} = time;
} else {
$s->{data}{$k} = {
val => $v,
time => time,
pub_time => time,
pub_id => $id
};
}
} else { delete $s->{data}{$k}; }
}
sub get { my ($s, $k) = @_; return $s->{data}{$k}; }
sub has { my ($s, $k, $v) = @_; return defined $s->{data}{$k}; }
sub unset { my ($s, $k) = @_; }
sub keys { my ($s) = @_; return keys $s->{data} }
}
package Kad::Node {
use strict;
use warnings;
use feature qw(switch);
use Readonly;
use AnyEvent;
use JSON::XS;
use List::Util qw(min);
use List::MoreUtils qw(firstidx);
use IO::Socket::INET;
use Data::Dumper;
# Node state
Readonly::Hash our %STATE => (
WAIT => 0, JOINING => 1, JOINED => 2, LEFT => 3
);
Readonly::Hash our %RPC => (
INFO => 'info', FIND => 'find', STORE => 'store',
);
sub new {
my ($class, $name, $bind, $port) = @_;
my $self = {
state => $STATE{WAIT},
lid => Kad::Id->from_hash($name),
bind => $bind,
port => $port,
rt => [], # routing table
ds => Kad::DataStore::Hash->new(),
reqs => {}, # current requests (tag => callback)
};
bless ($self, ref($class) || $class);
# create UDP socket
$self->{sock} = IO::Socket::INET->new(
LocalAddr => "$bind:$port", Proto => 'udp',
ReuseAddr => 1, Broadcast => 1, MultiHomed => 1
) or die "Couldn't create socket: $!\n";
# set socket read handler
$self->{_ae_recv} = AnyEvent->io(
fh => $self->{sock}, poll => 'r',
cb => sub { $self->udp_recv() }
) or die "Couldn't set event: $!\n";
return $self;
}
################################
# Routing table
################################
sub rt_update {
my ($self, $contact) = @_;
return if $contact->{id} == $self->{lid}; # don't add myself
if ($STATE{WAIT} == $self->{state}) {
AnyEvent::postpone {
$self->join(sub {
my ($c) = @_;
print "Joined on incoming, to $contact->{addr}\n" if $c;
}, $contact->{addr});
};
}
my $bn = ($contact->{id} ^ $self->{lid})->prefixlen();
unless (defined $self->{rt}[$bn]) { $self->{rt}[$bn] = [ $contact ]; return $contact; }
my $b = $self->{rt}[$bn];
if ((my $idx = firstidx { $_->{id} == $contact->{id} } @{$b}) >= 0) { # exists
unshift $b, splice($b, $idx, 1) if (0 != $idx); # move to top
return $b->[$idx];
} elsif (scalar @{$b} > $Kad::Config{RT_K}) {
# TODO: replacement cache
} else {
print "New node: $contact->{id} [$contact->{addr}]\n" if $DBG;
unshift $b, $contact; # add to top
}
return $contact;
}
sub rt_find_closest {
my ($self, $target, $qty, $skip) = @_;
my $bs = $self->{rt};
my $bn = ($target ^ $self->{lid})->prefixlen();
my @res = ();
push @res, @{$bs->[$bn]} if defined $bs->[$bn];
for (1 .. $Kad::Config{ID_BITS}) {
push @res, @{$bs->[$bn-$_]} if $bn-$_ >= 0 and defined $bs->[$bn-$_];
push @res, @{$bs->[$bn+$_]} if defined $bs->[$bn+$_];
last if $#res > $qty or ($bn-$_ < 0 and $bn+$_ >= $Kad::Config{ID_BITS});
}
@res = sort { ($a->{id} ^ $target) <=> ($b->{id} ^ $target) }
grep { $_->{id} != $skip } @res;
@res = @res[0 .. min($qty-1, $#res)];
return @res;
}
################################
# Helpers
################################
sub _endpoint {
my ($addr, $port) = @_;
if (defined $port and defined $addr) {
return sockaddr_in($port, inet_aton($addr));
} elsif (defined $addr) {
($port, $addr) = sockaddr_in($addr);
$addr = inet_ntoa($addr); #gethostbyaddr($host, AF_INET);
return ($addr, $port);
}
return;
}
sub on_msg {
my ($self, $obj, $addr, $port) = @_;
return if $obj->{type} and $obj->{tag} and $self->{reqs}{$obj->{tag}}; # Request from myself?
my $cont; $cont = $self->rt_update({ # Update RT on each message
id => Kad::Id->new($obj->{id}),
addr => "$addr:$port",
}) if $obj->{id};
given ($obj->{type}) {
when ('stdin') { # Input from keyboard
return unless $addr eq "127.0.0.1"; # Small protection ;)
given ($obj->{text}) {
when (/^exit$/) { exit; }
when (/^ping (.*)$/) {
print "Ping to $1...\n";
$self->ping($1, sub { print $_[0] ? "OK!\n" : "Fail\n"; });
} when (/^join (.*)$/) {
print "Joining to $1...\n";
$self->join(sub { print $_[0] ? "OK!\n" : "Fail ;(\n"; }, $1);
} when (/^find (.*)$/) {
$self->find(Kad::Id->from_hash($1), sub {
print $_[0] ? "Addr: $_[0]->{addr}\n" : "Not found\n";
});
} when (/^set (\S*) (.*)$/) {
$self->store(Kad::Id->from_hash($1), $2, sub {
print $_[0] ? "Stored to $_[0] nodes\n" : "Failed: Stored only locally\n";
});
} when (/^get (\S*)$/) {
$self->retrieve(Kad::Id->from_hash($1), sub {
print $_[0] ? "Value: $_[0]{val}\nAge: " . (time - $_[0]{pub_time}) . "s\n" : "Not found\n";
});
} when (/^hash (\S*)$/) {
print Kad::Id->from_hash($1);
} when (/^dump$/) {
print "State: $self->{state}\n";
print Data::Dumper->Dump([$self->{rt}, $self->{ds}{data}], [qw(rt ds)]);
}}
} when ($RPC{INFO}) { # Kad PING
$self->udp_send({
tag => $obj->{tag}
}, "$addr:$port");
} when ($RPC{FIND}) { # Kad FIND_NODE and FIND_VALUE (tgt or key)
if (defined $obj->{tgt} or (defined $obj->{key} and not $self->{ds}->has($obj->{key}))) {
my @res = $self->rt_find_closest(
Kad::Id->new($obj->{tgt} or $obj->{key}),
$Kad::Config{RT_K}, Kad::Id->new($obj->{id}));
my @res2; push @res2, { id => "$_->{id}", addr => $_->{addr} } foreach @res;
$self->udp_send({ nodes => \@res2, tag => $obj->{tag} }, "$addr:$port");
} elsif (defined $obj->{key} and $self->{ds}->has($obj->{key})) {
$self->udp_send({ value => $self->{ds}->get($obj->{key}), tag => $obj->{tag} }, "$addr:$port");
}
} when ($RPC{STORE}) { # Kad STORE
if (defined $obj->{key} and defined $obj->{val}) {
print "Set $obj->{key} to $obj->{val}\n" if $DBG;
$self->{ds}->set($obj->{key}, $obj->{val}, $obj->{id});
$self->udp_send ({ tag => $obj->{tag} }, "$addr:$port");
}
} when (undef) { # Reply?
if ($obj->{tag} and my $req = delete $self->{reqs}{$obj->{tag}}) {
$req->{cbk}->($obj, $addr, $port);
} else {
print ("Unhandled rsp: [$addr:$port] tag: $obj->{tag}\n") if $DBG;
}
} default {
print "Unhandled req: [$addr:$port] type: $obj->{type}, tag: $obj->{tag}\n";
}}
}
sub udp_recv {
my ($self) = @_;
$self->{sock}->recv(my $msg, 4096);
return unless $msg;
my ($addr, $port) = _endpoint($self->{sock}->peername);
return if $port < $Kad::Config{MIN_PORT};
my ($len, $obj) = (length $msg, decode_json $msg);
print "<= udp[$addr:$port] ($len) $msg\n" if $DUMP;
on_msg($self, $obj, $addr, $port);
}
sub udp_send {
my ($self, $obj, $to, $cbk) = @_;
die unless $obj and $to;
my ($addr, $port) = split /:/, $to;
return if $port < $Kad::Config{MIN_PORT};
$obj->{id} = "$self->{lid}"; # set id
# generate tag
$obj->{tag} = join('', ('0'..'9', 'a'..'z', 'A'..'Z', '-', '_')[
map { rand 64 } (1 .. 12) # 64-bit number equivalent
]) unless exists $obj->{tag};
# register callbacks
$self->{reqs}{$obj->{tag}} = {
cbk => $cbk,
tmr => AnyEvent->timer (after => $Kad::Config{TIMEOUT_REQ}, cb => sub {
print "Timeout: $obj->{tag}\n" if $DBG;
delete $self->{reqs}{$obj->{tag}};
$cbk->();
})
} if ($cbk);
defined $obj->{$_} or delete $obj->{$_} for keys $obj; # cleanup undefs
# actually send
my $msg = encode_json($obj);
my $len = length($msg);
print "=> udp[$to] ($len) $msg\n" if $DUMP;
send ($self->{sock}, $msg, 0, _endpoint($addr, $port));
}
sub find_iter {
my ($self, $tgt, $mode, $cbk) = @_; # mode can be qw(one all val)
# context of async operation
my @res = $self->rt_find_closest($tgt, $Kad::Config{RT_K});
unless (scalar @res) { $cbk->(($mode eq 'val') ? undef : \@res); return }
if ($mode eq 'one' and $res[0]->{id} == $tgt) { $cbk->(\@res); return } # found in local table -> finish
my $pend = 0; my %sent; my %seen;
$seen{$self->{lid}} = 1; # treat myself as seen
$seen{$_->{id}} = 1 foreach @res;
# helpers
my $ask;
$ask = sub {
print "Res: ", scalar @res, ", Pending: $pend, Seen: ", scalar keys %seen, ", Sent: ", scalar keys %sent, "\n" if $DBG;
foreach my $c (grep { !$sent{$_->{id}} } @res) {
last if $pend >= $Kad::Config{ALPHA};
$pend++;
print "Asking: $c->{id}\n" if $DBG;
$sent{$c->{id}} = 1;
$self->udp_send({
type => $RPC{FIND}, (($mode eq 'val')?'key':'tgt') => "$tgt"
}, $c->{addr}, sub {
my ($obj) = @_;
return unless $tgt; # operation already finished
$pend--;
if ($obj and $obj->{nodes}) {
my $added = 0; # add non-seen items to list
foreach my $c (grep { !$seen{$_->{id}} } @{$obj->{nodes}}) {
$c->{id} = Kad::Id->new($c->{id});
$seen{$c->{id}} = 1; $added++; push @res, $c; $self->rt_update($c);
}
# unless found new nodes -> finish
unless ($added) { $tgt = undef; $cbk->(($mode eq 'val')?undef:\@res); return }
@res = sort { ($a->{id} ^ $tgt) <=> ($b->{id} ^ $tgt) } @res;
@res = @res[0 .. min($Kad::Config{RT_K}-1, $#res)];
if ($DBG) { print "Res distances:\n"; print ' ', ($_->{id} ^ $tgt)->as_hex(), "\n" foreach @res; }
if ($mode eq 'one' and $res[0]->{id} == $tgt) {
$tgt = undef; $cbk->(\@res); return; # if found required node -> finish
}
} elsif ($obj and $obj->{value} and $mode eq 'val') {
$tgt = undef; $cbk->($obj->{value}); return; # if found required value -> finish
}
$ask->();
# if no new requests -> finish
unless ($pend) { $tgt = undef; $cbk->(($mode eq 'val') ? undef : \@res); return }
});
}
};
$ask->();
}
################################
# API
################################
sub ping {
my ($self, $addr, $cbk) = @_;
$self->udp_send ({ type => $RPC{INFO} }, $addr, $cbk);
}
sub join {
my ($self, $cbk, @addrs) = @_;
my $pend = scalar @addrs;
my $found = 0;
my $prev = $self->{state}; $self->{state} = $STATE{JOINING};
# ping all peers
$self->ping($_, sub {
my ($obj, $addr, $port) = @_;
$found++ if $obj;
return if --$pend;
if (!$found) { $self->{state} = $prev; $cbk->(); return } # no peers responded ;(
# find myself
$self->find_iter($self->{lid}, 'all', sub {
# TODO: Refresh buckets
$self->{state} = $STATE{JOINED}; $cbk->(1);
});
}) foreach @addrs;
}
sub find {
my ($self, $id, $cbk) = @_;
unless ($STATE{JOINED} == $self->{state}) { $cbk->(); return }
$self->find_iter($id, 'one', sub {
if (my ($ct) = @_) {
if ($ct->[0]->{id} == $id) {
$cbk->($ct->[0]); return;
}
}
$cbk->();
});
}
sub store {
my ($self, $key, $val, $cbk) = @_;
unless ($STATE{JOINED} == $self->{state}) { $cbk->(0); return }
$self->find_iter($key, 'all', sub {
if (my ($ct) = @_) {
my ($pend, $done) = (scalar @{$ct}, 0);
# TODO: Store to myself if I'm closer than farthest
foreach my $c (@{$ct}) {
$self->udp_send ({
type => $RPC{STORE}, key => "$key", val => $val
}, $c->{addr}, sub {
my ($obj) = @_;
$done++ if $obj;
$cbk->($done) unless --$pend;
});
}
} else { $cbk->(); }
});
}
sub retrieve {
my ($self, $key, $cbk) = @_;
unless ($STATE{JOINED} == $self->{state}) { $cbk->(); return }
if ($self->{ds}->has($key)) {
$cbk->($self->{ds}->get($key));
} else {
$self->find_iter($key, 'val', $cbk);
}
}
}
package main;
use warnings;
use strict;
use threads;
use AnyEvent;
use Getopt::Long;
use List::Util qw(shuffle);
use IO::Socket::INET;
use JSON::XS;
sub usage {
print <<"USAGE_END";
Example usage:
$0 --name=node1 -p 1111
$0 --name=node2 -p 2222 -d 127.0.0.1:1111
$0 --name=node3 -p 3333 -d 127.0.0.1:1111
Note:
Port address should not be less than $Kad::Config{MIN_PORT}.
Node id is derived from name.
USAGE_END
exit;
}
GetOptions(
"b|bind=s" => \(my $BIND = '0.0.0.0'),
"p|port=s" => \(my $PORT = '1111'),
"d|dest=s" => \(my $DEST),
"name=s" => \(my $NAME),
"dbg" => \$DBG, "dump" => \$DUMP,
) or usage();
$| = 1;
usage if $PORT < $Kad::Config{MIN_PORT};
usage unless $NAME;
my $node;
my @helpers;
if ($NAME eq 'simul') {
$node = Kad::Node->new(Kad::Id->random(), $BIND, $PORT);
for my $port ($PORT+1 .. $PORT+50) {
my $n = Kad::Node->new(Kad::Id->random(), $BIND, $port);
$n->join(sub { print "$port: ", ((shift)? "ok" : "fail") }, "localhost:$PORT");
push @helpers, $n;
}
} else {
$node = Kad::Node->new($NAME, $BIND, $PORT);
}
print "Kad ID: $node->{lid}\n";
AnyEvent::postpone {
$node->join(sub {
print ((shift) ? 'Joined to ' : 'Could not join to ', $DEST, "\n");
}, $DEST);
} if $DEST;
async {
my $in_sock = IO::Socket::INET->new(
LocalAddr => 'localhost', PeerAddr => "localhost:$PORT", Proto => 'udp'
) or die "Couldn't create socket: $!\n";
while(1) {
print "${NAME}> "; my $cmd = <STDIN>;
last unless defined $cmd;
chomp $cmd; next unless $cmd;
$in_sock->send(encode_json({ type => 'stdin', text => $cmd }));
}
}->detach();
AnyEvent->condvar->recv;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment