Created
August 13, 2014 10:30
-
-
Save faried/a75c30ae246e3287d884 to your computer and use it in GitHub Desktop.
changes against nictuku/dht to support the dht store extension (incomplete)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/dht.go b/dht.go | |
index 9671b0b..53e42b9 100644 | |
--- a/dht.go | |
+++ b/dht.go | |
@@ -144,14 +144,18 @@ type DHT struct { | |
peersRequest chan ihReq | |
nodesRequest chan ihReq | |
pingRequest chan *remoteNode | |
+ getRequest chan string | |
+ putRequest chan string | |
portRequest chan int | |
stop chan bool | |
clientThrottle *nettools.ClientThrottle | |
store *dhtStore | |
+ gpstore GPStore | |
tokenSecrets []string | |
// Public channels: | |
PeersRequestResults chan map[InfoHash][]string // key = infohash, v = slice of peers. | |
+ GetRequestResults chan map[InfoHash]string // key = infohash, v = bencoded value | |
} | |
// New creates a DHT node. If config is nil, DefaultConfig will be used. | |
@@ -169,6 +173,7 @@ func New(config *Config) (node *DHT, err error) { | |
routingTable: newRoutingTable(), | |
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers), | |
PeersRequestResults: make(chan map[InfoHash][]string, 1), | |
+ GetRequestResults: make(chan map[InfoHash]string, 1), | |
stop: make(chan bool), | |
exploredNeighborhood: false, | |
// Buffer to avoid blocking on sends. | |
@@ -177,12 +182,15 @@ func New(config *Config) (node *DHT, err error) { | |
peersRequest: make(chan ihReq, 100), | |
nodesRequest: make(chan ihReq, 100), | |
pingRequest: make(chan *remoteNode), | |
+ getRequest: make(chan string), | |
+ putRequest: make(chan string), | |
portRequest: make(chan int), | |
clientThrottle: nettools.NewThrottler(), | |
tokenSecrets: []string{newTokenSecret(), newTokenSecret()}, | |
} | |
c := openStore(cfg.Port, cfg.SaveRoutingTable) | |
node.store = c | |
+ node.gpstore = make(GPStore, 1) | |
if len(c.Id) != 20 { | |
c.Id = randNodeId() | |
log.V(4).Infof("Using a new random node ID: %x %d", c.Id, len(c.Id)) | |
@@ -402,6 +410,10 @@ func (d *DHT) Run() error { | |
go pingSlowly(d.pingRequest, needPing, d.config.CleanupPeriod, d.stop) | |
case node := <-d.pingRequest: | |
d.pingNode(node) | |
+ case target := <-d.getRequest: | |
+ d.get(target) | |
+ case bmessage := <-d.putRequest: | |
+ d.put(bmessage) | |
case <-secretRotateTicker: | |
d.tokenSecrets = []string{newTokenSecret(), d.tokenSecrets[0]} | |
case d.portRequest <- d.config.Port: | |
@@ -518,6 +530,10 @@ func (d *DHT) processPacket(p packetType) { | |
d.processFindNodeResults(node, r) | |
case "announce_peer": | |
// Nothing to do. In the future, update counters. | |
+ case "get": | |
+ d.processGetResults(node, r) | |
+ case "put": | |
+ d.processPutResults(node, r) | |
default: | |
log.V(3).Infof("DHT: Unknown query type: %v from %v", query.Type, addr) | |
} | |
@@ -551,6 +567,10 @@ func (d *DHT) processPacket(p packetType) { | |
d.replyFindNode(p.raddr, r) | |
case "announce_peer": | |
d.replyAnnouncePeer(p.raddr, node, r) | |
+ case "get": | |
+ d.replyGet(p.raddr, r) | |
+ case "put": | |
+ d.replyPut(p.raddr, node, r) | |
default: | |
log.V(3).Infof("DHT: non-implemented handler for type %v", r.Q) | |
} | |
@@ -949,5 +969,11 @@ var ( | |
totalRecvFindNodeReply = expvar.NewInt("totalRecvFindNodeReply") | |
totalPacketsFromBlockedHosts = expvar.NewInt("totalPacketsFromBlockedHosts") | |
totalDroppedPackets = expvar.NewInt("totalDroppedPackets") | |
+ totalSentGet = expvar.NewInt("totalSentGet") | |
+ totalSentPut = expvar.NewInt("totalSentPut") | |
+ totalRecvGet = expvar.NewInt("totalRecvGet") | |
+ totalRecvGetReply = expvar.NewInt("totalRecvGetReply") | |
+ totalRecvPut = expvar.NewInt("totalRecvPut") | |
+ totalRecvPutReply = expvar.NewInt("totalRecvPutReply") | |
totalRecv = expvar.NewInt("totalRecv") | |
) | |
diff --git a/krpc.go b/krpc.go | |
index 7e9f655..2424b55 100644 | |
--- a/krpc.go | |
+++ b/krpc.go | |
@@ -120,12 +120,17 @@ func (r *remoteNode) wasContactedRecently(ih InfoHash) bool { | |
return false | |
} | |
-type getPeersResponse struct { | |
+type responseRType struct { | |
// TODO: argh, values can be a string depending on the client (e.g: original bittorrent). | |
Values []string "values" | |
Id string "id" | |
Nodes string "nodes" | |
Token string "token" | |
+ Nodes6 string "nodes6" // not used right now | |
+ V string "v" // any bencoded data whose SHA-1 hash matches "target" | |
+ K string "k" // optional, for mutable get responses. | |
+ Seq int "seq" | |
+ Sig string "sig" | |
} | |
type answerType struct { | |
@@ -134,16 +139,17 @@ type answerType struct { | |
InfoHash InfoHash "info_hash" // should probably be a string. | |
Port int "port" | |
Token string "token" | |
+ V string "v" // put request doesn't have an info hash | |
} | |
// Generic stuff we read from the wire, not knowing what it is. This is as generic as can be. | |
type responseType struct { | |
- T string "t" | |
- Y string "y" | |
- Q string "q" | |
- R getPeersResponse "r" | |
- E []string "e" | |
- A answerType "a" | |
+ T string "t" | |
+ Y string "y" | |
+ Q string "q" | |
+ R responseRType "r" | |
+ E []string "e" | |
+ A answerType "a" | |
// Unsupported mainline extension for client identification. | |
// V string(?) "v" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Support for the dht_store proposed extension. | |
// This lets the user find and store both mutable and immutable data | |
// in the DHT. | |
package dht | |
// Summary from the libtorrent storage specification: | |
// | |
// RPCs: | |
// get: | |
// query DHT nodes to find some data. | |
// put: | |
// store data in DHT nodes. | |
// Reference: | |
// http://libtorrent.org/dht_store.html | |
// arvid's comment at http://forum.bittorrent.com/topic/30932-open-standard/ | |
import ( | |
"crypto/sha1" | |
"fmt" | |
"io" | |
"net" | |
"strings" | |
"time" | |
log "github.com/golang/glog" | |
"github.com/nictuku/nettools" | |
) | |
// Object store. For lack of a better name, GP = GetPut. | |
// Not persisted to disk right now and doesn't handle mutable data | |
// For mutable data, the value needs to be a struct, not a string. | |
type GPStore map[string]string | |
// public api | |
func (d *DHT) Get(target string) { | |
d.getRequest <- target | |
} | |
func (d *DHT) Put(bmessage string) { | |
d.putRequest <- bmessage | |
} | |
// Process another node's response to a get query. If the response | |
// contains a "v" key, send it to the Torrent engine, our client, using | |
// DHT.GetRequestResults channel. | |
// TODO | |
func (d *DHT) processGetResults(node *remoteNode, resp responseType) { | |
totalRecvGetReply.Add(1) | |
} | |
// Process another node's response to a put query. | |
// TODO | |
func (d *DHT) processPutResults(node *remoteNode, resp responseType) { | |
totalRecvPutReply.Add(1) | |
} | |
func (d *DHT) replyGet(addr net.UDPAddr, r responseType) { | |
totalRecvGet.Add(1) | |
if log.V(3) { | |
log.Infof("DHT get. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x", | |
addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId))) | |
} | |
node := InfoHash(r.A.Target) | |
val := d.gpstore[r.A.Target] | |
r0 := map[string]interface{}{ | |
"id": d.nodeId, | |
"token": d.hostToken(addr, d.tokenSecrets[0]), | |
"nodes6": "", | |
} | |
reply := replyMessage{ | |
T: r.T, | |
Y: "r", | |
R: r0, | |
} | |
neighbors := d.routingTable.lookup(node) | |
n := make([]string, 0, kNodes) | |
for _, r := range neighbors { | |
n = append(n, r.id+r.addressBinaryFormat) | |
} | |
if val != "" { | |
log.V(3).Infof("replyGet: found %v for %v", val, r.A.Target) | |
reply.R["v"] = val | |
} else { | |
log.V(3).Infof("replyGet: Nodes only. Giving %d", len(n)) | |
} | |
reply.R["nodes"] = strings.Join(n, "") | |
sendMsg(d.conn, addr, reply) | |
} | |
func (d *DHT) replyPut(addr net.UDPAddr, node *remoteNode, r responseType) { | |
totalRecvPut.Add(1) | |
ih := InfoHash(r.A.InfoHash) | |
if log.V(3) { | |
log.Infof("DHT put. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x", | |
addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId))) | |
} | |
// Fail early: spec says we don't have to store anything >= 1000 bytes | |
if len(r.A.V) >= 1000 { | |
return | |
} | |
// node can be nil if, for example, the server just restarted and received an announce_peer | |
// from a node it doesn't yet know about. | |
if node != nil && d.checkToken(addr, r.A.Token) { | |
peerAddr := net.TCPAddr{IP: addr.IP, Port: r.A.Port} | |
d.peerStore.addContact(ih, nettools.DottedPortToBinary(peerAddr.String())) | |
node.lastResponseTime = time.Now().Add(-searchRetryPeriod) | |
} | |
// Store the message and reply positively. | |
h := sha1.New() | |
io.WriteString(h, r.A.V) | |
d.gpstore[fmt.Sprintf("%x", h)] = r.A.V | |
log.V(4).Infof("put stored for %v", r.A.V) | |
reply := replyMessage{ | |
T: r.T, | |
Y: "r", | |
R: map[string]interface{}{"id": d.nodeId}, | |
} | |
sendMsg(d.conn, addr, reply) | |
} | |
func (d *DHT) get(target string) { | |
log.V(3).Infof("DHT: get => %+v", target) | |
ih, err := DecodeInfoHash(target) | |
if err != nil { | |
// handle this in a better way? | |
log.Warningf("get decode failure: %v", err) | |
return | |
} | |
closest := d.routingTable.lookupFiltered(ih) | |
if len(closest) == 0 { | |
// handle this in a better way? | |
log.Warning("get: no nodes?!") | |
return | |
} | |
r := closest[0] | |
t := r.newQuery("get") | |
queryArguments := map[string]interface{}{"id": d.nodeId, "target": target} | |
query := queryMessage{t, "q", "get", queryArguments} | |
sendMsg(d.conn, r.address, query) | |
totalSentGet.Add(1) | |
} | |
func (d *DHT) put(bmessage string) { | |
log.V(3).Infof("DHT: put => %+v", bmessage) | |
h := sha1.New() | |
io.WriteString(h, bmessage) | |
hash := fmt.Sprintf("%x", h) | |
ih, err := DecodeInfoHash(hash) | |
if err != nil { | |
// handle this in a better way? | |
log.Warningf("put decode failure: %v", err) | |
return | |
} | |
closest := d.routingTable.lookupFiltered(ih) | |
if len(closest) == 0 { | |
// handle this in a better way? | |
log.Warning("put: no nodes?!") | |
return | |
} | |
r := closest[0] | |
t := r.newQuery("put") | |
queryArguments := map[string]interface{}{"id": d.nodeId, "v": bmessage} | |
query := queryMessage{t, "q", "put", queryArguments} | |
sendMsg(d.conn, r.address, query) | |
totalSentPut.Add(1) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment