Created
May 29, 2022 07:02
-
-
Save kalloc/0f6c480b102c93d7d9723c203a10d84f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# encoding: utf-8 | |
import math | |
import socket | |
import threading | |
from time import sleep, time | |
from hashlib import sha1 | |
from random import randint | |
from struct import pack, unpack | |
from socket import inet_ntoa | |
from threading import Timer, Thread | |
from time import sleep | |
from collections import deque | |
from bencode import bencode, bdecode | |
from Queue import Queue | |
import base64 | |
import json | |
import urllib2 | |
import traceback | |
import gc | |
import thread | |
import sys | |
import os | |
import signal | |
infos = [] | |
BOOTSTRAP_NODES = ( | |
("router.bittorrent.com", 6881), | |
("dht.transmissionbt.com", 6881), | |
("router.utorrent.com", 6881) | |
) | |
TID_LENGTH = 2 | |
RE_JOIN_DHT_INTERVAL = 3 | |
TOKEN_LENGTH = 2 | |
BT_PROTOCOL = "BitTorrent protocol" | |
BT_MSG_ID = 20 | |
EXT_HANDSHAKE_ID = 0 | |
def help(): | |
print './dht_spider.py [option]' | |
print ' [-s]: This option will not storage any file on your computer.Just print the informations on terminal.' | |
print ' [-p:filename]: Where you want to storage magnets.' | |
print ' [-h]: Print this help and exit.' | |
print ' [-t:thread num]: How many thread to get the torrents metadata.' | |
print ' [-b:(0|1)]: 0-Do not save the torrent file. 1-Save the torrent file.' | |
def get_option(): | |
s=0 | |
global options | |
global path | |
global thread_num | |
global save_seed | |
print 'This project is using AGPL-3.0+' | |
print '(C) 2017-2017 LEXUGE.' | |
print ' ' | |
if sys.argv[1]=="-h": | |
help() | |
sys.exit(0) | |
while True: | |
s=s+1 | |
try: | |
options.append(sys.argv[s]) | |
except: | |
break | |
for i in options: | |
if i=="-s": | |
path="-s" | |
save_seed=0 | |
if i[:3]=="-p:": | |
path=i[3:] | |
if i[:3]=="-t:": | |
try: | |
thread_num=int(i[3:]) | |
except: | |
pass | |
if i[:3]=="-b:": | |
try: | |
save_seed=int(i[3:]) | |
except: | |
pass | |
if path=="": | |
path="hash.log" | |
if thread_num==0: | |
thread_num=100 | |
if (save_seed!=0)and(save_seed!=1): | |
save_seed=1 | |
#class watch start | |
class Watcher: | |
def __init__(self): | |
self.child = os.fork() | |
if self.child == 0: | |
return | |
else: | |
self.watch() | |
def watch(self): | |
try: | |
os.wait() | |
except KeyboardInterrupt: | |
print '[EXIT] Control-C' | |
self.kill() | |
sys.exit() | |
def kill(self): | |
try: | |
os.kill(self.child, signal.SIGKILL) | |
except OSError: pass | |
#class watch end | |
def storage_info(info,metadate_old,address): | |
global infos | |
global save_seed | |
global path | |
count=0 | |
flag=0 | |
if path!="-s": | |
try: | |
test_fo=open(path,"r") | |
except: | |
test_fo=open(path,"w") | |
test=test_fo.read() | |
if info['hash_id'] in test: | |
flag=1 | |
test_fo.close() | |
if (not(info['hash_id'] in infos))and(flag==0): | |
if path=="-s": | |
pass | |
else: | |
fo=open(path,"a") | |
fo.write("BT Name:"+str(info['hash_name'])+"\n") | |
fo.write("Sender:"+str(address)+"\n") | |
fo.write("infohash:"+str(info['hash_id'])+"\n") | |
fo.write("magnet:?xt=urn:btih:"+str(info['hash_id'])+"\n") | |
print "BT Name:"+str(info['hash_name']) | |
print "Sender:"+str(address) | |
print "infohash:"+str(info['hash_id']) | |
print "magnet:?xt=urn:btih:"+str(info['hash_id']) | |
for i in info['files']: | |
if count==10: | |
break | |
print " "+str(i['path'][0]),str(i['length']) | |
if path!="-s": | |
fo.write(" "+str(i['path'][0])+" "+str(i['length'])+"\n") | |
count=count+1 | |
if path!="-s": | |
fo.write("\n\n") | |
fo.close() | |
if save_seed==1: | |
ftest=open("BT/"+info['hash_name']+".torrent","wb") | |
ftest.write(metadate_old) | |
ftest.close() | |
print "\r\n\r\n" | |
infos.append(info['hash_id']) | |
def entropy(length): | |
return "".join(chr(randint(0, 255)) for _ in xrange(length)) | |
def random_id(): | |
h = sha1() | |
h.update(entropy(20)) | |
return h.digest() | |
def decode_nodes(nodes): | |
n = [] | |
length = len(nodes) | |
if (length % 26) != 0: | |
return n | |
for i in range(0, length, 26): | |
nid = nodes[i:i+20] | |
ip = inet_ntoa(nodes[i+20:i+24]) | |
port = unpack("!H", nodes[i+24:i+26])[0] | |
n.append((nid, ip, port)) | |
return n | |
def timer(t, f): | |
Timer(t, f).start() | |
def get_neighbor(target, nid, end=10): | |
return target[:end]+nid[end:] | |
def send_packet(the_socket, msg): | |
the_socket.send(msg) | |
def send_message(the_socket, msg): | |
msg_len = pack(">I", len(msg)) | |
send_packet(the_socket, msg_len + msg) | |
def send_handshake(the_socket, infohash): | |
bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL | |
ext_bytes = "\x00\x00\x00\x00\x00\x10\x00\x00" | |
peer_id = random_id() | |
packet = bt_header + ext_bytes + infohash + peer_id | |
send_packet(the_socket, packet) | |
def check_handshake(packet, self_infohash): | |
try: | |
bt_header_len, packet = ord(packet[:1]), packet[1:] | |
if bt_header_len != len(BT_PROTOCOL): | |
return False | |
except TypeError: | |
return False | |
bt_header, packet = packet[:bt_header_len], packet[bt_header_len:] | |
if bt_header != BT_PROTOCOL: | |
return False | |
packet = packet[8:] | |
infohash = packet[:20] | |
if infohash != self_infohash: | |
return False | |
return True | |
def send_ext_handshake(the_socket): | |
msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m": {"ut_metadata": 1}}) | |
send_message(the_socket, msg) | |
def request_metadata(the_socket, ut_metadata, piece): | |
"""bep_0009""" | |
msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece}) | |
send_message(the_socket, msg) | |
def get_ut_metadata(data): | |
try: | |
ut_metadata = "_metadata" | |
index = data.index(ut_metadata) + len(ut_metadata) + 1 | |
return int(data[index]) | |
except Exception as e: | |
pass | |
def get_metadata_size(data): | |
metadata_size = "metadata_size" | |
start = data.index(metadata_size) + len(metadata_size) + 1 | |
data = data[start:] | |
return int(data[:data.index("e")]) | |
def recvall(the_socket, timeout=15): | |
the_socket.setblocking(0) | |
total_data = [] | |
data = "" | |
begin = time() | |
while True: | |
sleep(0.05) | |
if total_data and time() - begin > timeout: | |
break | |
elif time() - begin > timeout * 2: | |
break | |
try: | |
data = the_socket.recv(1024) | |
if data: | |
total_data.append(data) | |
begin = time() | |
except Exception: | |
pass | |
return "".join(total_data) | |
def ip_black_list(ipaddress): | |
black_lists = ['45.32.5.150', '45.63.4.233'] | |
if ipaddress in black_lists: | |
return True | |
return False | |
def download_metadata(address, infohash, timeout=15): | |
if ip_black_list(address[0]): | |
return | |
try: | |
the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
the_socket.settimeout(15) | |
the_socket.connect(address) | |
# handshake | |
send_handshake(the_socket, infohash) | |
packet = the_socket.recv(4096) | |
# handshake error | |
if not check_handshake(packet, infohash): | |
try: | |
the_socket.close() | |
except: | |
return | |
return | |
# ext handshake | |
send_ext_handshake(the_socket) | |
packet = the_socket.recv(4096) | |
# get ut_metadata and metadata_size | |
ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet) | |
# print 'ut_metadata_size: ', metadata_size | |
# request each piece of metadata | |
metadate_old="" | |
metadata = [] | |
for piece in range(int(math.ceil(metadata_size / (16.0 * 1024)))): | |
request_metadata(the_socket, ut_metadata, piece) | |
packet = recvall(the_socket, timeout) # the_socket.recv(1024*17) # | |
metadata.append(packet[packet.index("ee") + 2:]) | |
metadata = "".join(metadata) | |
info = {} | |
#vierfy metadata | |
chech_metadata=sha1(str(metadata)).hexdigest() | |
if chech_metadata.upper()!=infohash.encode("hex").upper(): | |
print "[check infohash] failed.Check_sum:"+chech_metadata.upper() | |
print "[check infohash] infohash:"+infohash.encode("hex").upper() | |
try: | |
print "\r\n\r\n" | |
the_socket.close() | |
except: | |
print "\r\n\r\n" | |
return | |
return | |
else: | |
print "[check infohash] successful.Check_sum:"+chech_metadata.upper() | |
print "[check infohash] infohash:"+infohash.encode("hex").upper() | |
meta_data = bdecode(metadata) | |
#torrent file | |
metadate_old="d4:info"+str(metadata)+"e" | |
del metadata | |
info['hash_id'] = infohash.encode("hex").upper() | |
if meta_data.has_key('name'): | |
info["hash_name"] = meta_data["name"].strip() | |
else: | |
info["hash_name"] = '' | |
if meta_data.has_key('length'): | |
info['hash_size'] = meta_data['length'] | |
else: | |
info['hash_size'] = 0 | |
if meta_data.has_key('files'): | |
info['files'] = meta_data['files'] | |
for item in info['files']: | |
# print item | |
if item.has_key('length'): | |
info['hash_size'] += item['length'] | |
#info['files'] = json.dumps(info['files'], ensure_ascii=False) | |
#info['files'] = info['files'].replace("\"path\"", "\"p\"").replace("\"length\"", "\"l\"") | |
else: | |
info['files'] = '' | |
info['a_ip'] = address[0] | |
info['hash_size'] = str(info['hash_size']) | |
#print info, "\r\n\r\n" | |
storage_info(info,metadate_old,address) | |
del info | |
gc.collect() | |
except socket.timeout: | |
try: | |
the_socket.close() | |
except: | |
return | |
except socket.error: | |
try: | |
the_socket.close() | |
except: | |
return | |
except Exception, e: | |
try: | |
# print e | |
# traceback.print_exc() | |
the_socket.close() | |
except: | |
return | |
finally: | |
try: | |
the_socket.close() | |
except: | |
return | |
return | |
class KNode(object): | |
def __init__(self, nid, ip, port): | |
self.nid = nid | |
self.ip = ip | |
self.port = port | |
class DHT_Process(Thread): | |
def __init__(self, master, ip, port, max_node_qsize): | |
Thread.__init__(self) | |
self.setDaemon(True) | |
self.max_node_qsize = max_node_qsize | |
self.nid = random_id() | |
self.nodes = deque(maxlen=max_node_qsize) | |
self.master = master | |
self.ip = ip | |
self.port = port | |
self.process_request_actions = { | |
"get_peers": self.on_get_peers_request, | |
"announce_peer": self.on_announce_peer_request, | |
} | |
self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
self.ufd.bind((self.ip, self.port)) | |
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) | |
def send_krpc(self, msg, address): | |
try: | |
self.ufd.sendto(bencode(msg), address) | |
except Exception: | |
pass | |
def send_find_node(self, address, nid=None): | |
nid = get_neighbor(nid, self.nid) if nid else self.nid | |
tid = entropy(TID_LENGTH) | |
msg = { | |
"t": tid, | |
"y": "q", | |
"q": "find_node", | |
"a": { | |
"id": nid, | |
"target": random_id() | |
} | |
} | |
self.send_krpc(msg, address) | |
def join_DHT(self): | |
for address in BOOTSTRAP_NODES: | |
self.send_find_node(address) | |
def re_join_DHT(self): | |
if len(self.nodes) == 0: | |
self.join_DHT() | |
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) | |
def run(self): | |
self.re_join_DHT() | |
while True: | |
try: | |
(data, address) = self.ufd.recvfrom(65536) | |
msg = bdecode(data) | |
self.on_message(msg, address) | |
except Exception: | |
pass | |
def process_find_node_response(self, msg, address): | |
nodes = decode_nodes(msg["r"]["nodes"]) | |
for node in nodes: | |
(nid, ip, port) = node | |
if len(nid) != 20: continue | |
if ip == self.ip: continue | |
if port < 1 or port > 65535: continue | |
n = KNode(nid, ip, port) | |
self.nodes.append(n) | |
def on_get_peers_request(self, msg, address): | |
try: | |
infohash = msg["a"]["info_hash"] | |
tid = msg["t"] | |
nid = msg["a"]["id"] | |
token = infohash[:TOKEN_LENGTH] | |
msg = { | |
"t": tid, | |
"y": "r", | |
"r": { | |
"id": get_neighbor(infohash, self.nid), | |
"nodes": "", | |
"token": token | |
} | |
} | |
self.send_krpc(msg, address) | |
except KeyError: | |
pass | |
def on_announce_peer_request(self, msg, address): | |
try: | |
infohash = msg["a"]["info_hash"] | |
token = msg["a"]["token"] | |
nid = msg["a"]["id"] | |
tid = msg["t"] | |
if infohash[:TOKEN_LENGTH] == token: | |
if msg["a"].has_key("implied_port") and msg["a"]["implied_port"] != 0: | |
port = address[1] | |
else: | |
port = msg["a"]["port"] | |
if port < 1 or port > 65535: return | |
self.master.log(infohash, (address[0], port)) | |
except Exception: | |
pass | |
finally: | |
self.ok(msg, address) | |
def ok(self, msg, address): | |
try: | |
tid = msg["t"] | |
nid = msg["a"]["id"] | |
msg = { | |
"t": tid, | |
"y": "r", | |
"r": { | |
"id": get_neighbor(nid, self.nid) | |
} | |
} | |
self.send_krpc(msg, address) | |
except KeyError: | |
pass | |
def on_message(self, msg, address): | |
try: | |
if msg["y"] == "r": | |
if msg["r"].has_key("nodes"): | |
self.process_find_node_response(msg, address) | |
elif msg["y"] == "q": | |
try: | |
self.process_request_actions[msg["q"]](msg, address) | |
except KeyError: | |
self.play_dead(msg, address) | |
except KeyError: | |
pass | |
def play_dead(self, msg, address): | |
try: | |
tid = msg["t"] | |
msg = { | |
"t": tid, | |
"y": "e", | |
"e": [202, "Server Error"] | |
} | |
self.send_krpc(msg, address) | |
except KeyError: | |
pass | |
def auto_send_find_node(self): | |
wait = 1.0 / self.max_node_qsize | |
while True: | |
try: | |
node = self.nodes.popleft() | |
self.send_find_node((node.ip, node.port), node.nid) | |
except IndexError: | |
pass | |
sleep(wait) | |
class Master(Thread): | |
def __init__(self): | |
Thread.__init__(self) | |
self.setDaemon(True) | |
self.queue = Queue() | |
def run(self): | |
while True: | |
self.downloadMetadata() | |
def log(self, infohash, address=None): | |
self.queue.put([address, infohash]) | |
# print "%s from %s:%s" % ( | |
# infohash.encode("hex"), address[0], address[1] | |
# ) | |
def downloadMetadata(self): | |
global thread_num | |
# 100 threads for download metadata | |
for i in xrange(0, thread_num): | |
if self.queue.qsize() == 0: | |
sleep(1) | |
continue | |
announce = self.queue.get() | |
t = threading.Thread(target=download_metadata, args=(announce[0], announce[1])) | |
t.setDaemon(True) | |
t.start() | |
#main | |
if __name__ == "__main__": | |
path="" | |
thread_num=0 | |
save_seed=-1 | |
options=[] | |
trans_queue = Queue() | |
get_option() | |
print path | |
print thread_num | |
print save_seed | |
if (path!="-s")and(save_seed==1): | |
if not os.path.exists('BT/'): | |
os.makedirs('BT/') | |
#start watcher | |
Watcher() | |
#metadata process | |
master = Master() | |
master.start() | |
#start DHT Network | |
print('Receiving datagrams on :6882') | |
dht = DHT_Process(master, "0.0.0.0", 6882, max_node_qsize=200) | |
dht.start() | |
thread.start_new_thread(dht.auto_send_find_node,()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment