Created
July 5, 2025 18:30
-
-
Save theSoberSobber/9ac1be5d369369b998602d2acca5117d to your computer and use it in GitHub Desktop.
Fully Passive Bitcoin node that connects to the testnet peers by dns seeds and does handshake + asks for mempool + responds to ping's
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket, time, struct, hashlib, threading | |
from io import BytesIO | |
import dns.resolver | |
TESTNET_MAGIC = b'\x0b\x11\x09\x07' | |
PORT = 18333 | |
# DNS seeds | |
DNS_SEEDS = [ | |
"testnet-seed.bitcoin.jonasschnelli.ch", | |
"testnet-seed.bluematt.me", | |
"testnet-seed.bitcoin.schildbach.de", | |
"seed.tbtc.petertodd.org" | |
] | |
def sha256(x): return hashlib.sha256(x).digest() | |
def checksum(x): return sha256(sha256(x))[:4] | |
def encode_varint(i): | |
if i < 0xfd: return struct.pack('B', i) | |
elif i <= 0xffff: return b'\xfd' + struct.pack('<H', i) | |
elif i <= 0xffffffff: return b'\xfe' + struct.pack('<I', i) | |
else: return b'\xff' + struct.pack('<Q', i) | |
def read_varint(s): | |
i = s.read(1)[0] | |
if i == 0xfd: return struct.unpack('<H', s.read(2))[0] | |
elif i == 0xfe: return struct.unpack('<I', s.read(4))[0] | |
elif i == 0xff: return struct.unpack('<Q', s.read(8))[0] | |
else: return i | |
def make_message(command, payload): | |
return ( | |
TESTNET_MAGIC + | |
command.ljust(12, b'\x00') + | |
struct.pack('<I', len(payload)) + | |
checksum(payload) + | |
payload | |
) | |
def make_version_payload(): | |
version = 70015 | |
services = 0 | |
timestamp = int(time.time()) | |
addr_recv = b'\x00' * 26 | |
addr_from = b'\x00' * 26 | |
nonce = b'\x00' * 8 | |
user_agent = b'\x00' | |
start_height = struct.pack('<i', 0) | |
relay = b'\x00' | |
return struct.pack('<iQQ26s26s8sB', version, services, timestamp, addr_recv, addr_from, nonce, 0) + user_agent + start_height + relay | |
def recv_all(sock, n): | |
buf = b'' | |
while len(buf) < n: | |
chunk = sock.recv(n - len(buf)) | |
if not chunk: | |
raise ConnectionError("Socket closed") | |
buf += chunk | |
return buf | |
def handle_peer(ip): | |
try: | |
s = socket.create_connection((ip, PORT), timeout=10) | |
print(f"[+] Connected to {ip}") | |
s.sendall(make_message(b'version', make_version_payload())) | |
def read_message(): | |
header = recv_all(s, 24) | |
magic, command, length, cksum = struct.unpack('<4s12sI4s', header) | |
payload = recv_all(s, length) | |
return command.strip(b'\x00'), payload | |
# Wait for handshake | |
while True: | |
cmd, payload = read_message() | |
if cmd == b'verack': | |
break | |
print(f"[✓] Handshake complete with {ip}") | |
s.sendall(make_message(b'mempool', b'')) | |
print(f"[>] Requested mempool from {ip}") | |
while True: | |
cmd, payload = read_message() | |
if cmd == b'inv': | |
stream = BytesIO(payload) | |
count = read_varint(stream) | |
print(f"[<] {ip} inv({count})") | |
txids = [] | |
for _ in range(count): | |
typ = struct.unpack('<I', stream.read(4))[0] | |
h = stream.read(32)[::-1].hex() | |
if typ == 1: | |
print(f" {ip} TXID: {h}") | |
txids.append(h) | |
if txids: | |
getdata = encode_varint(len(txids)) | |
for txid in txids: | |
getdata += struct.pack('<I', 1) + bytes.fromhex(txid)[::-1] | |
s.sendall(make_message(b'getdata', getdata)) | |
elif cmd == b'tx': | |
print(f"[<] {ip} tx ({len(payload)} bytes)") | |
elif cmd == b'ping': | |
s.sendall(make_message(b'pong', payload)) | |
print(f"[<] {ip} ping/pong") | |
elif cmd == b'pong': | |
pass # optionally log pong | |
else: | |
print(f"[<] {ip} {cmd.decode()} ({len(payload)} bytes)") | |
except Exception as e: | |
print(f"[!] {ip} error: {e}") | |
def resolve_seeds(seeds, limit=10): | |
ips = set() | |
for seed in seeds: | |
try: | |
answers = dns.resolver.resolve(seed, "A") | |
for rdata in answers: | |
ips.add(rdata.to_text()) | |
except Exception as e: | |
print(f"[!] Failed to resolve {seed}: {e}") | |
return list(ips)[:limit] | |
if __name__ == '__main__': | |
print("[*] Resolving DNS seeds...") | |
all_ips = resolve_seeds(DNS_SEEDS) | |
print(f"[✓] Got {len(all_ips)} peers:") | |
for ip in all_ips: | |
print(f" {ip}") | |
threads = [] | |
for ip in all_ips: | |
t = threading.Thread(target=handle_peer, args=(ip,)) | |
t.start() | |
threads.append(t) | |
for t in threads: | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment