Skip to content

Instantly share code, notes, and snippets.

@faried
Created August 13, 2014 10:30
Show Gist options
  • Save faried/a75c30ae246e3287d884 to your computer and use it in GitHub Desktop.
Save faried/a75c30ae246e3287d884 to your computer and use it in GitHub Desktop.
changes against nictuku/dht to support the dht store extension (incomplete)
diff --git a/dht.go b/dht.go
index 9671b0b..53e42b9 100644
--- a/dht.go
+++ b/dht.go
@@ -144,14 +144,18 @@ type DHT struct {
peersRequest chan ihReq
nodesRequest chan ihReq
pingRequest chan *remoteNode
+ getRequest chan string
+ putRequest chan string
portRequest chan int
stop chan bool
clientThrottle *nettools.ClientThrottle
store *dhtStore
+ gpstore GPStore
tokenSecrets []string
// Public channels:
PeersRequestResults chan map[InfoHash][]string // key = infohash, v = slice of peers.
+ GetRequestResults chan map[InfoHash]string // key = infohash, v = bencoded value
}
// New creates a DHT node. If config is nil, DefaultConfig will be used.
@@ -169,6 +173,7 @@ func New(config *Config) (node *DHT, err error) {
routingTable: newRoutingTable(),
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers),
PeersRequestResults: make(chan map[InfoHash][]string, 1),
+ GetRequestResults: make(chan map[InfoHash]string, 1),
stop: make(chan bool),
exploredNeighborhood: false,
// Buffer to avoid blocking on sends.
@@ -177,12 +182,15 @@ func New(config *Config) (node *DHT, err error) {
peersRequest: make(chan ihReq, 100),
nodesRequest: make(chan ihReq, 100),
pingRequest: make(chan *remoteNode),
+ getRequest: make(chan string),
+ putRequest: make(chan string),
portRequest: make(chan int),
clientThrottle: nettools.NewThrottler(),
tokenSecrets: []string{newTokenSecret(), newTokenSecret()},
}
c := openStore(cfg.Port, cfg.SaveRoutingTable)
node.store = c
+ node.gpstore = make(GPStore, 1)
if len(c.Id) != 20 {
c.Id = randNodeId()
log.V(4).Infof("Using a new random node ID: %x %d", c.Id, len(c.Id))
@@ -402,6 +410,10 @@ func (d *DHT) Run() error {
go pingSlowly(d.pingRequest, needPing, d.config.CleanupPeriod, d.stop)
case node := <-d.pingRequest:
d.pingNode(node)
+ case target := <-d.getRequest:
+ d.get(target)
+ case bmessage := <-d.putRequest:
+ d.put(bmessage)
case <-secretRotateTicker:
d.tokenSecrets = []string{newTokenSecret(), d.tokenSecrets[0]}
case d.portRequest <- d.config.Port:
@@ -518,6 +530,10 @@ func (d *DHT) processPacket(p packetType) {
d.processFindNodeResults(node, r)
case "announce_peer":
// Nothing to do. In the future, update counters.
+ case "get":
+ d.processGetResults(node, r)
+ case "put":
+ d.processPutResults(node, r)
default:
log.V(3).Infof("DHT: Unknown query type: %v from %v", query.Type, addr)
}
@@ -551,6 +567,10 @@ func (d *DHT) processPacket(p packetType) {
d.replyFindNode(p.raddr, r)
case "announce_peer":
d.replyAnnouncePeer(p.raddr, node, r)
+ case "get":
+ d.replyGet(p.raddr, r)
+ case "put":
+ d.replyPut(p.raddr, node, r)
default:
log.V(3).Infof("DHT: non-implemented handler for type %v", r.Q)
}
@@ -949,5 +969,11 @@ var (
totalRecvFindNodeReply = expvar.NewInt("totalRecvFindNodeReply")
totalPacketsFromBlockedHosts = expvar.NewInt("totalPacketsFromBlockedHosts")
totalDroppedPackets = expvar.NewInt("totalDroppedPackets")
+ totalSentGet = expvar.NewInt("totalSentGet")
+ totalSentPut = expvar.NewInt("totalSentPut")
+ totalRecvGet = expvar.NewInt("totalRecvGet")
+ totalRecvGetReply = expvar.NewInt("totalRecvGetReply")
+ totalRecvPut = expvar.NewInt("totalRecvPut")
+ totalRecvPutReply = expvar.NewInt("totalRecvPutReply")
totalRecv = expvar.NewInt("totalRecv")
)
diff --git a/krpc.go b/krpc.go
index 7e9f655..2424b55 100644
--- a/krpc.go
+++ b/krpc.go
@@ -120,12 +120,17 @@ func (r *remoteNode) wasContactedRecently(ih InfoHash) bool {
return false
}
-type getPeersResponse struct {
+type responseRType struct {
// TODO: argh, values can be a string depending on the client (e.g: original bittorrent).
Values []string "values"
Id string "id"
Nodes string "nodes"
Token string "token"
+ Nodes6 string "nodes6" // not used right now
+ V string "v" // any bencoded data whose SHA-1 hash matches "target"
+ K string "k" // optional, for mutable get responses.
+ Seq int "seq"
+ Sig string "sig"
}
type answerType struct {
@@ -134,16 +139,17 @@ type answerType struct {
InfoHash InfoHash "info_hash" // should probably be a string.
Port int "port"
Token string "token"
+ V string "v" // put request doesn't have an info hash
}
// Generic stuff we read from the wire, not knowing what it is. This is as generic as can be.
type responseType struct {
- T string "t"
- Y string "y"
- Q string "q"
- R getPeersResponse "r"
- E []string "e"
- A answerType "a"
+ T string "t"
+ Y string "y"
+ Q string "q"
+ R responseRType "r"
+ E []string "e"
+ A answerType "a"
// Unsupported mainline extension for client identification.
// V string(?) "v"
}
// Support for the dht_store proposed extension.
// This lets the user find and store both mutable and immutable data
// in the DHT.
package dht
// Summary from the libtorrent storage specification:
//
// RPCs:
// get:
// query DHT nodes to find some data.
// put:
// store data in DHT nodes.
// Reference:
// http://libtorrent.org/dht_store.html
// arvid's comment at http://forum.bittorrent.com/topic/30932-open-standard/
import (
"crypto/sha1"
"fmt"
"io"
"net"
"strings"
"time"
log "github.com/golang/glog"
"github.com/nictuku/nettools"
)
// Object store. For lack of a better name, GP = GetPut.
// Not persisted to disk right now and doesn't handle mutable data
// For mutable data, the value needs to be a struct, not a string.
type GPStore map[string]string
// public api
func (d *DHT) Get(target string) {
d.getRequest <- target
}
func (d *DHT) Put(bmessage string) {
d.putRequest <- bmessage
}
// Process another node's response to a get query. If the response
// contains a "v" key, send it to the Torrent engine, our client, using
// DHT.GetRequestResults channel.
// TODO
func (d *DHT) processGetResults(node *remoteNode, resp responseType) {
totalRecvGetReply.Add(1)
}
// Process another node's response to a put query.
// TODO
func (d *DHT) processPutResults(node *remoteNode, resp responseType) {
totalRecvPutReply.Add(1)
}
func (d *DHT) replyGet(addr net.UDPAddr, r responseType) {
totalRecvGet.Add(1)
if log.V(3) {
log.Infof("DHT get. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x",
addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId)))
}
node := InfoHash(r.A.Target)
val := d.gpstore[r.A.Target]
r0 := map[string]interface{}{
"id": d.nodeId,
"token": d.hostToken(addr, d.tokenSecrets[0]),
"nodes6": "",
}
reply := replyMessage{
T: r.T,
Y: "r",
R: r0,
}
neighbors := d.routingTable.lookup(node)
n := make([]string, 0, kNodes)
for _, r := range neighbors {
n = append(n, r.id+r.addressBinaryFormat)
}
if val != "" {
log.V(3).Infof("replyGet: found %v for %v", val, r.A.Target)
reply.R["v"] = val
} else {
log.V(3).Infof("replyGet: Nodes only. Giving %d", len(n))
}
reply.R["nodes"] = strings.Join(n, "")
sendMsg(d.conn, addr, reply)
}
func (d *DHT) replyPut(addr net.UDPAddr, node *remoteNode, r responseType) {
totalRecvPut.Add(1)
ih := InfoHash(r.A.InfoHash)
if log.V(3) {
log.Infof("DHT put. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x",
addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId)))
}
// Fail early: spec says we don't have to store anything >= 1000 bytes
if len(r.A.V) >= 1000 {
return
}
// node can be nil if, for example, the server just restarted and received an announce_peer
// from a node it doesn't yet know about.
if node != nil && d.checkToken(addr, r.A.Token) {
peerAddr := net.TCPAddr{IP: addr.IP, Port: r.A.Port}
d.peerStore.addContact(ih, nettools.DottedPortToBinary(peerAddr.String()))
node.lastResponseTime = time.Now().Add(-searchRetryPeriod)
}
// Store the message and reply positively.
h := sha1.New()
io.WriteString(h, r.A.V)
d.gpstore[fmt.Sprintf("%x", h)] = r.A.V
log.V(4).Infof("put stored for %v", r.A.V)
reply := replyMessage{
T: r.T,
Y: "r",
R: map[string]interface{}{"id": d.nodeId},
}
sendMsg(d.conn, addr, reply)
}
func (d *DHT) get(target string) {
log.V(3).Infof("DHT: get => %+v", target)
ih, err := DecodeInfoHash(target)
if err != nil {
// handle this in a better way?
log.Warningf("get decode failure: %v", err)
return
}
closest := d.routingTable.lookupFiltered(ih)
if len(closest) == 0 {
// handle this in a better way?
log.Warning("get: no nodes?!")
return
}
r := closest[0]
t := r.newQuery("get")
queryArguments := map[string]interface{}{"id": d.nodeId, "target": target}
query := queryMessage{t, "q", "get", queryArguments}
sendMsg(d.conn, r.address, query)
totalSentGet.Add(1)
}
func (d *DHT) put(bmessage string) {
log.V(3).Infof("DHT: put => %+v", bmessage)
h := sha1.New()
io.WriteString(h, bmessage)
hash := fmt.Sprintf("%x", h)
ih, err := DecodeInfoHash(hash)
if err != nil {
// handle this in a better way?
log.Warningf("put decode failure: %v", err)
return
}
closest := d.routingTable.lookupFiltered(ih)
if len(closest) == 0 {
// handle this in a better way?
log.Warning("put: no nodes?!")
return
}
r := closest[0]
t := r.newQuery("put")
queryArguments := map[string]interface{}{"id": d.nodeId, "v": bmessage}
query := queryMessage{t, "q", "put", queryArguments}
sendMsg(d.conn, r.address, query)
totalSentPut.Add(1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment