Skip to content

Instantly share code, notes, and snippets.

@ymgve
Created October 9, 2023 14:23
Show Gist options
  • Save ymgve/425da1434743756b6095f5fb7e8ffec6 to your computer and use it in GitHub Desktop.
Save ymgve/425da1434743756b6095f5fb7e8ffec6 to your computer and use it in GitHub Desktop.
import hashlib, io, socket, struct, threading, time, hmac, random
from Crypto.Cipher import AES
from user import *
from secondblob import *
def steamtime_to_unixtime(steamtime_bin):
steamtime, = struct.unpack("<Q", steamtime_bin)
unixtime = steamtime // 1000000 - 62135596800
return unixtime
def unixtime_to_steamtime(unixtime):
steamtime = (unixtime + 62135596800) * 1000000
steamtime_bin = struct.pack("<Q", int(steamtime))
return steamtime_bin
def binaryxor(a, b):
if len(a) != len(b):
raise Exception("binaryxor: string lengths doesn't match!!")
return bytes(aa ^ bb for aa, bb in zip(a, b))
def decrypt_message(msg, key):
bio = io.BytesIO(msg)
IV = bio.read(16)
ptextsize, ctextsize = struct.unpack(">HH", bio.read(4))
ctext = bio.read(ctextsize)
aes = AES.new(key, AES.MODE_CBC, IV)
ptext = aes.decrypt(ctext)
print("removing padding at end", ptext[ptextsize:].hex())
ptext = ptext[:ptextsize]
return ptext
def encrypt_with_pad(ptext, key, IV):
padsize = 16 - len(ptext) % 16
ptext += bytes([padsize] * padsize)
aes = AES.new(key, AES.MODE_CBC, IV)
ctext = aes.encrypt(ptext)
return ctext
def blob_serialize(blobdict):
blobdata = b""
for key in blobdict:
if key == b"__slack__":
continue
value = blobdict[key]
if type(value) == dict:
value = blob_serialize(value)
blobdata += struct.pack("<HI", len(key), len(value)) + key + value
if b"__slack__" in blobdict:
slack = blobdict[b"__slack__"]
else :
slack = b""
return b"\x01\x50" + struct.pack("<II", 10 + len(blobdata), len(slack)) + blobdata + slack
def decodeIP(s):
(oct1, oct2, oct3, oct4, port) = struct.unpack("<BBBBH", s)
ip = "%d.%d.%d.%d" % (oct1, oct2, oct3, oct4)
return ip, port
def encodeIP(ipport):
ip, port = ipport
if type(port) == str:
port = int(port)
oct = ip.split(".")
s = struct.pack("<BBBBH", int(oct[0]), int(oct[1]), int(oct[2]), int(oct[3]), port)
return s
def recv_all(s, sz):
res = b""
while len(res) < sz:
data = s.recv(sz - len(res))
if len(data) == 0:
raise Exception("Socket disconnect")
res += data
return res
def recv_withlen(s):
szbin = recv_all(s, 4)
sz, = struct.unpack(">I", szbin)
res = recv_all(s, sz)
return res
def send_withlen(s, msg):
szbin = struct.pack(">I", len(msg))
return s.sendall(szbin + msg)
def dirserver_worker(cs, address):
print("dirserver_worker accepted", cs, address)
msg = recv_all(cs, 4)
version, = struct.unpack(">I", msg)
if version != 2:
raise Exception("Bad directory query version", version)
cs.sendall(b"\x01")
msg = recv_withlen(cs)
print("received dir server message |" + msg.hex() + "|")
if msg[0] == 0:
if len(msg) != 5:
raise Exception("badly sized message")
print("authserver for username")
reply = struct.pack(">H", 1) + encodeIP(("192.168.0.16", 27032))
send_withlen(cs, reply)
cs.close()
elif msg[0] == 3:
if len(msg) != 1:
raise Exception("badly sized message")
# ConfigServer
reply = struct.pack(">H", 1) + encodeIP(("192.168.0.16", 27031))
# reply = b"\x00\x00" - "fatal error, no configservers are online"
send_withlen(cs, reply)
cs.close()
elif msg[0] == 15:
if len(msg) != 1:
raise Exception("badly sized message")
# unknown server
reply = struct.pack(">H", 1) + encodeIP(("192.168.0.16", 27034))
send_withlen(cs, reply)
cs.close()
else:
print("requested server type not implemented " + msg.hex())
reply = b"\x00\x00"
send_withlen(cs, reply)
cs.close()
def configserver_worker(cs, address):
print("configserver_worker accepted", cs, address)
msg = recv_all(cs, 4)
# steam 2003 has version 2 here, later is version 3?
version, = struct.unpack(">I", msg)
if version != 2:
raise Exception("Bad config query version", version)
#cs.sendall(b"\x00") # ConfigServer unacceptable protocol version
cs.sendall(b"\x01")
msg = recv_withlen(cs)
print("received config server message |" + msg.hex() + "|")
if msg == b"\x01":
blob = {
b"\x01\x00\x00\x00": b"\x00\x00\x00\x00",
b"\x02\x00\x00\x00": b"\x00\x00\x00\x00",
}
binblob = blob_serialize(blob)
print(binblob.hex())
send_withlen(cs, binblob)
cs.close()
elif msg[0] == 2:
#cdr = open("secondblob.bin", "rb").read()
cdr = blob_serialize(secondblob_text)
send_withlen(cs, cdr)
cs.close()
else:
while True:
data = cs.recv(16384)
if len(data) == 0:
print("disconnect")
break
print("|" + data.hex() + "|")
def authserver_worker(cs, address):
print("authserver_worker accepted", cs, address)
msg = recv_all(cs, 1)
# version, client IP, username hash
# 00 00000003 c700a8c0 40e8aab5
# 00 00000003 c700a8c0 29f3cc7f
if msg[0] == 0:
msg = recv_all(cs, 12)
print("authserver_worker |" + msg.hex() + "|")
version, = struct.unpack(">I", msg[0:4])
if version != 3:
raise Exception("Bad authserver login version", version)
client_internal_ip = msg[4:8][::-1]
client_external_ip = socket.inet_aton(address[0])
# first byte here:
# 00 - OK
# 01 - a new version of steam is available
# 02 - a server error occured
# then follows client's external IP
cs.sendall(b"\x00" + client_external_ip[::-1])
msg = recv_withlen(cs)
print("authserver_worker |" + msg.hex() + "|")
bio = io.BytesIO(msg)
cmd = bio.read(1)
if cmd[0] == 2:
sz1, = struct.unpack(">H", bio.read(2))
username1 = bio.read(sz1)
sz2, = struct.unpack(">H", bio.read(2))
username2 = bio.read(sz2)
remainder = bio.read()
if len(username1) != sz1 or len(username2) != sz2 or username1 != username2 or len(remainder) != 0:
raise Exception("bad usernames!")
salt = b"\x01\x23\x45\x67\x89\xab\xcd\xef"
password = b"test"
cs.sendall(salt) # salt
key = hashlib.sha1(salt[0:4] + password + salt[4:8]).digest()[:16]
msg = recv_withlen(cs)
print("authserver_worker |" + msg.hex() + "|")
ptext = decrypt_message(msg, key)
if len(ptext) != 12:
raise Exception("bad plaintext size")
if ptext[8:12] != client_internal_ip:
raise Exception("internal IP doesn't match (bad decryption?)")
controlhash = hashlib.sha1(client_external_ip + client_internal_ip).digest()
client_time = steamtime_to_unixtime(binaryxor(ptext[0:8], controlhash[0:8]))
skew = int(time.time() - client_time)
print("time skew", skew)
if abs(skew) >= 3600:
raise Exception("skew too large (bad decryption?)")
blob = blob_serialize(user_registry)
bloblen = len(blob)
innerkey = bytes.fromhex("10231230211281239191238542314233")
innerIV = bytes.fromhex("12899c8312213a123321321321543344")
blob_encrypted = encrypt_with_pad(blob, innerkey, innerIV)
blob_encrypted = struct.pack("<I", bloblen) + innerIV + blob_encrypted
blob_signature = hmac.digest(innerkey, blob_encrypted, hashlib.sha1)
blob_encrypted_len = 10 + len(blob_encrypted) + 20
blob_encrypted = struct.pack(">I", blob_encrypted_len) + b"\x01\x45" + struct.pack("<II", blob_encrypted_len, 0) + blob_encrypted + blob_signature
currtime = int(time.time())
outerIV = bytes.fromhex("92183129534234231231312123123353")
steamid = bytes.fromhex("0000" + "80808000" + "00000000")
servers = bytes.fromhex("111213149a69151617189a69")
times = unixtime_to_steamtime(currtime) + unixtime_to_steamtime(currtime + (60*60*24*28))
subheader = innerkey + steamid + servers + times
subheader_encrypted = encrypt_with_pad(subheader, key, outerIV)
subheader_encrypted = b"\x00\x01" + outerIV + b"\x00\x36\x00\x40" + subheader_encrypted
unknown_part = b"\x01\x68" + (b"\xff" * 0x168)
ticket = subheader_encrypted + unknown_part + blob_encrypted
ticket_signed = ticket + hmac.digest(innerkey, ticket, hashlib.sha1)
tgt_command = b"\x01" # AuthenticateAndRequestTGT command
steamtime = unixtime_to_steamtime(time.time())
ticket_full = tgt_command + steamtime + b"\x00\xd2\x49\x6b\x00\x00\x00\x00" + struct.pack(">I", len(ticket_signed)) + ticket_signed
cs.sendall(ticket_full)
cs.close()
elif cmd[0] == 9:
cs.send(b"\x01")
cs.close()
elif cmd[0] == 10:
innerkey = bytes.fromhex("10231230211281239191238542314233")
if hmac.digest(innerkey, msg[1:-20], hashlib.sha1) == msg[-20:]:
print("HMAC verified OK")
else:
raise Exception("BAD HMAC")
ticketsize, = struct.unpack(">H", bio.read(2))
ticket = bio.read(ticketsize)
print("ticket", ticket.hex())
ptext = decrypt_message(bio.read()[:-20], innerkey)
print("ptext", ptext.hex())
bio = io.BytesIO(ptext)
sz1, = struct.unpack("<H", bio.read(2))
username1 = bio.read(sz1)
sz2, = struct.unpack("<H", bio.read(2))
username2 = bio.read(sz2)
if len(username1) != sz1 or len(username2) != sz2 or username1 != username2:
raise Exception("bad usernames!")
controlhash = hashlib.sha1(client_external_ip + client_internal_ip).digest()
client_time = steamtime_to_unixtime(binaryxor(bio.read(8), controlhash[0:8]))
skew = int(time.time() - client_time)
print("time skew", skew)
if abs(skew) >= 3600:
raise Exception("skew too large (bad decryption?)")
# Incompatible ContentTicket VersionNum
# u16SizeOfPlaintextClientReadableContentTicket
# Bad u16SizeOfAESEncryptedClientReadableContentTicket
# u16SizeOfServerReadableContentTicket
currtime = time.time()
client_ticket = b"\x69" * 0x10 # key used for MAC signature
client_ticket += unixtime_to_steamtime(currtime) # TicketCreationTime
client_ticket += unixtime_to_steamtime(currtime + 86400) # TicketValidUntilTime
client_ticket += random.randbytes(4) #struct.pack("<I", 1)
client_ticket += random.randbytes(8) # struct.pack("<II", 1, 2)
client_ticket += encodeIP(("192.168.0.16", 27033)) + b"\x00\x00" # why are there extra bytes? maybe padding to 4 byte boundary
server_ticket = b"\x55" * 0x80
innerIV = bytes.fromhex("12899c8312213a123321321321543344")
client_ticket_encrypted = encrypt_with_pad(client_ticket, innerkey, innerIV)
ticket = b"\x00\x02" + innerIV + struct.pack(">HH", len(client_ticket), len(client_ticket_encrypted)) + client_ticket_encrypted
ticket += struct.pack(">H", len(server_ticket)) + server_ticket
ticket_signed = ticket + hmac.digest(client_ticket[0:16], ticket, hashlib.sha1)
cs.send(b"\x00\x01" + struct.pack(">I", len(ticket_signed)) + ticket_signed)
#send_withlen(cs, b"\x00\x01" + b"\x00" * 4 + b"\x12" * 12)
cs.close()
else:
raise Exception("unknown command")
# while True:
# data = cs.recv(16384)
# if len(data) == 0:
# print("authserver_worker disconnect")
# break
# print("authserver_worker |" + data.hex() + "|")
def contentserver_worker(cs, address):
print("contentserver_worker accepted", cs, address)
msg = recv_all(cs, 4)
print("contentserver_worker version |" + msg.hex() + "|")
version, = struct.unpack(">I", msg)
if version != 5:
raise Exception("Bad contentserver version", version)
# 00 is reject version, all others is accept version
cs.sendall(b"\x01")
first = True
while True:
msg = recv_withlen(cs)
print("contentserver_worker |" + msg.hex() + "|")
if msg[0] == 0:
if len(msg) != 1:
raise Exception("unexpected size")
if first:
# if the banner can't be rendered due to an inaccessible domain (nonexistant URL or offline PC)
# then steam2003 will crash when opening the statistics page
# setting the sponsor to about:blank will always give a page, so it doesn't crash anymore
sponsor_url = b"about:blank"
cs.sendall(b"\x01" + struct.pack(">H", len(sponsor_url)) + sponsor_url)
first = False
else:
cs.send(b"\x00")
elif msg[0] == 2:
bio = io.BytesIO(msg[1:])
(connid, messageid, app, version) = struct.unpack(">IIII", bio.read(16))
print(connid, messageid, app, version)
connid |= 0x80000000
key = b"\x69" * 0x10
signeddata = bio.read()
if hmac.digest(key, signeddata[:-20], hashlib.sha1) == signeddata[-20:]:
print("HMAC verified OK")
else:
raise Exception("BAD HMAC")
bio = io.BytesIO(signeddata)
ticketsize, = struct.unpack(">H", bio.read(2))
ticket = bio.read(ticketsize)
ptext = decrypt_message(bio.read()[:-20], key)
print(ptext.hex())
print(repr(ptext))
manif = open("storages\\3_0.manifest", "rb").read()
csum = manif[0x30:0x34]
cs.sendall(struct.pack(">II", connid, 0))
cs.sendall(b"\x00" + struct.pack(">I", 0) + csum[::-1])
elif msg[0] == 4:
bio = io.BytesIO(msg[1:])
storageid, messageid = struct.unpack(">II", bio.read(8))
print(storageid, messageid)
manif = open("storages\\3_0.manifest", "rb").read()
reply = struct.pack(">IIBI", storageid, messageid, 0, len(manif))
cs.sendall(reply)
reply = struct.pack(">III", storageid, messageid, len(manif))
cs.sendall(reply + manif)
elif msg[0] == 6:
bio = io.BytesIO(msg[1:])
storageid, messageid = struct.unpack(">II", bio.read(8))
print(storageid, messageid)
csums = open("storages\\3.checksums", "rb").read()
reply = struct.pack(">IIBI", storageid, messageid, 0, len(csums))
cs.sendall(reply)
reply = struct.pack(">III", storageid, messageid, len(csums))
cs.sendall(reply + csums)
# 04007465737404007465737400008080800000000000 b3ce5ce3 1d6a571b
# b'\x04\x00test\x04\x00test\x00\x00\x80\x80\x80\x00\x00\x00\x00\x00\xb3\xce\\\xe3\x1djW\x1b'
# 04007465737404007465737400008080800000000000 dbd01ce0 1d6a571b
# b'\x04\x00test\x04\x00test\x00\x00\x80\x80\x80\x00\x00\x00\x00\x00\xdb\xd0\x1c\xe0\x1djW\x1b'
else:
raise Exception("unknown data")
while True:
data = cs.recv(16384)
if len(data) == 0:
print("disconnect")
break
print("|" + data.hex() + "|")
def server1_worker(cs, address):
print("server1_worker accepted", cs, address)
while True:
data = cs.recv(16384)
if len(data) == 0:
print("disconnect")
break
print("server1 unknown data |" + data.hex() + "|")
def dirserver_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 27030))
s.listen(5)
print("dirserver_listener listening")
while True:
cs, address = s.accept()
worker = threading.Thread(target=dirserver_worker, args=(cs, address))
worker.start()
def configserver_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 27031))
s.listen(5)
print("configserver_listener listening")
while True:
cs, address = s.accept()
worker = threading.Thread(target=configserver_worker, args=(cs, address))
worker.start()
def authserver_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 27032))
s.listen(5)
print("authserver_listener listening")
while True:
cs, address = s.accept()
worker = threading.Thread(target=authserver_worker, args=(cs, address))
worker.start()
def contentserver_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 27033))
s.listen(5)
print("contentserver_listener listening")
while True:
cs, address = s.accept()
worker = threading.Thread(target=contentserver_worker, args=(cs, address))
worker.start()
def server1_listener():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 27034))
s.listen(5)
print("server1_listener listening")
while True:
cs, address = s.accept()
worker = threading.Thread(target=server1_worker, args=(cs, address))
worker.start()
def sign_message(key, message) :
key = key + b"\x00" * 48
xor_a = b"\x36" * 64
xor_b = b"\x5c" * 64
key_a = binaryxor(key, xor_a)
key_b = binaryxor(key, xor_b)
phrase_a = key_a + message
checksum_a = hashlib.sha1(phrase_a).digest()
phrase_b = key_b + checksum_a
checksum_b = hashlib.sha1(phrase_b).digest()
return checksum_b
if __name__ == "__main__":
# key = b"\x69" * 0x10
# msg = bytes.fromhex("0123555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555202f8d7c26585fd79195f7e3f0054007001e0020335666fda466ee5230237e4a98bda6613652caf475914ee99f4a0ff2e02fd602483821e1058eba4c40d91f3a55e6fe6557934862")
# for i in range(len(msg)):
# if hmac.digest(key, msg[i:-20], hashlib.sha1) == msg[-20:]:
# print(i)
# exit()
dirserver = threading.Thread(target=dirserver_listener)
dirserver.start()
configserver = threading.Thread(target=configserver_listener)
configserver.start()
authserver = threading.Thread(target=authserver_listener)
authserver.start()
contentserver = threading.Thread(target=contentserver_listener)
contentserver.start()
server1 = threading.Thread(target=server1_listener)
server1.start()
contentserver.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment