Skip to content

Instantly share code, notes, and snippets.

@cretz
Created January 26, 2017 12:44
Show Gist options
  • Save cretz/386858e98eb4194524580961d87deaf9 to your computer and use it in GitHub Desktop.
Save cretz/386858e98eb4194524580961d87deaf9 to your computer and use it in GitHub Desktop.
Early kademlia sketch
package dht
import (
"container/heap"
"fmt"
"math/big"
"net"
"sync"
)
const bitCount = 160
const k = 20
const alpha = 3
type nodeID *big.Int
type dhtImpl struct {
conn wrappedConn
buckets [bitCount]*kBucket
kv map[nodeID][]byte
}
func (d *dhtImpl) onPing() {
}
func (d *dhtImpl) storeLocalValue(key nodeID, val []byte) error {
d.kv[key] = val
return nil
}
func (d *dhtImpl) getClosestNodesLocal(target nodeID, amount int) (nodesByDistance, error) {
// We have to go through em all here and we just add to our running sort
n := make(nodesByDistance, 0, amount+1)
for _, bucket := range d.buckets {
for _, nd := range bucket.nodes {
n.HeapPushOverrideLast(target, nd)
}
}
return n[:amount], nil
}
func (d *dhtImpl) fetchLocalValue(key nodeID) ([]byte, bool, error) {
v, ok := d.kv[key]
return v, ok, nil
}
func (d *dhtImpl) lookup(target nodeID) (nodesByDistance, error) {
localList, err := d.getClosestNodesLocal(target, alpha+1)
if err != nil {
return err
}
// Must have some node
if len(localList) == 0 {
return fmt.Errorf("No nodes to lookup")
}
// Have to maintain a collect of what we've already looked up
alreadyQueried := map[nodeID]struct{}{}
var varSync sync.Mutex
// Obtain the next set to look up skipping over ones we've asked
nextToQuery := func() nodesByDistance {
ret := make(nodesByDistance, 0, alpha)
for _, v := range localList {
if alreadyQueried[v.id] == nil {
ret = append(ret, v)
alreadyQueried[v.id] = struct{}{}
if len(ret) == alpha {
break
}
}
}
return ret
}
// Keep going until we've already asked the closest
for alreadyQueried[localList[0].id] == nil {
// Run them in parallel
var wg sync.WaitGroup
for _, n := range nextToQuery() {
wg.Add(1)
go func(n *node) {
defer wg.Done()
if nodes, err := n.findNodes(target); err == nil {
varSync.Lock()
defer varSync.Unlock()
for _, nd := range nodes {
if alreadyQueried[nd.id] == nil {
localList.HeapPushOverrideLast(target, nd)
}
}
}
}(n)
}
// Wait for em all to finish
wg.Wait()
}
return localList[:k], nil
}
type kBucket struct {
minDist *big.Int
nodes []*node
}
type node struct {
id nodeID
addr net.Addr
}
func (n *node) findNodes(id nodeID) ([]*node, error) {
// TODO
panic("Not impld")
}
// One or the other, never both, but a nil byte array can represent an empty val so check node array
func (n *node) findNodesOrValue(id nodeID) ([]*node, []byte, error) {
// TODO
panic("Not impld")
}
type nodeWithDistance struct {
*node
dist *big.Int
}
// Implements heap.Interface
type nodesByDistance []*nodeWithDistance
func (n nodesByDistance) Len() int { return len(n) }
func (n nodesByDistance) Less(i, j int) { return n[i].dist.Cmp(n[j]) < 0 }
func (n nodesByDistance) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
func (n *nodesByDistance) Push(x interface{}) { *n = append(*n, x.(*nodeWithDistance)) }
func (n *nodesByDistance) Pop() (ret interface{}) {
ret = *n[len(n)-1]
*n = *n[:len(n)-1]
return
}
func (n nodesByDistance) HeapPushOverrideLast(target nodeID, nd *node) {
if len(n) > cap(n)-1 {
// Update the max (i.e. furthest) which is the last and we don't care
n[len(n)-1].node = nd
n[len(n)-1].dist.Xor(target, nd.id)
heap.Fix(n, len(n)-1)
} else {
var dist big.Int
heap.Push(n, &nodeWithDistance{node: nd, dist: dist.Xor(target, nd.id)})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment