-
-
Save lukegb/d2997a5fc7970ce6e1e1 to your computer and use it in GitHub Desktop.
import struct | |
import json | |
import copy | |
import gzip | |
try: | |
from cStringIO import StringIO | |
except: | |
from StringIO import StringIO | |
ROFL_MAGIC = 'RIOT' + chr(0) * 2 | |
FILENAME = '20130523_0935_HA_PBE1_26967171.rofl' | |
FILENAME2 = '20130523_0702_HA_PBE1_26961185.rofl' | |
# With thanks to https://github.com/robertabcd/lol-ob | |
class Struct(object): | |
format = None | |
extradata = None | |
@classmethod | |
def get_extradata(cls, fileobj): | |
return [None] * len(cls.get_format(fileobj, None)) | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return cls.format | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
format = cls.get_format(fileobj, extradata=extradata) | |
f_str = fh.read(struct.calcsize(format)) | |
res = struct.unpack(format, f_str) | |
me = cls() | |
me.unpack_tuple(res, fileobj, extradata) | |
return me | |
def unpack_tuple(self, res, fileobj, extradata): | |
for field_name, field_value in zip(self.fields, res): | |
custom_func = getattr(self, "unpack_{}".format(field_name), None) | |
if custom_func is not None: | |
custom_func(field_name, field_value, fileobj, extradata) | |
else: | |
setattr(self, field_name, field_value) | |
def pack_tuple(self, fileobj, extradata): | |
out = [] | |
for field_name in self.fields: | |
custom_func = getattr(self, "pack_{}".format(field_name), None) | |
if custom_func is not None: | |
val = custom_func(field_name, fileobj, extradata) | |
else: | |
val = getattr(self, field_name) | |
out.push(val) | |
return out | |
def write(self, fh, fileobj, extradata=None): | |
out_tuple = self.pack_tuple(fileobj, extradata) | |
fh.write(struct.pack(self.get_format(fileobj, extradata), *out_tuple)) | |
class CompositeStruct(Struct): | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
self = cls() | |
for clazz, field in zip(cls.get_format(fileobj), cls.fields): | |
setattr(self, field, clazz.read(fh, self, extradata=extradata)) | |
return self | |
class CompositeStructList(Struct): | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
self = cls() | |
self.outer = fileobj | |
self.data = [] | |
for clazz, ed in zip( | |
cls.get_format(fileobj, extradata=extradata), | |
cls.get_extradata(fileobj) | |
): | |
self.data.append(clazz.read(fh, self, extradata=ed)) | |
return self | |
def __str__(self): | |
return "[{}]".format(', '.join((str(s) for s in self.data))) | |
class RoflHeader(Struct): | |
format = '6s256sHIIIIII' | |
fields = [ | |
'magic', 'signature', 'header_len', 'file_len', | |
'metadata_offset', 'metadata_len', 'payload_header_offset', | |
'payload_header_len', 'payload_offset' | |
] | |
def __str__(self): | |
return "<RoflHeader - magic: {}>".format(self.magic) | |
class RoflMetadata(Struct): | |
fields = ['json'] | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return "{}s".format(fileobj.header.metadata_len) | |
def unpack_json(self, field_name, field_value, fileobj, extradata): | |
self.json = json.loads(field_value) | |
self.json['stats'] = json.loads(self.json['statsJSON']) | |
del self.json['statsJSON'] | |
def pack_json(self, field_name, fileobj, extradata): | |
tmpj = copy.deepcopy(self.json) | |
tmpj['statsJSON'] = json.dumps(tmpj['stats']) | |
del tmpj['stats'] | |
return json.dumps(tmpj) | |
def as_json(self): | |
return json.dumps(self.json, indent=4) | |
def __str__(self): | |
return "<RoflMetadata>" | |
class RoflPayloadHeader(Struct): | |
format = 'QIIIIIIH' | |
fields = [ | |
'game_id', 'game_length', 'keyframe_count', 'chunk_count', | |
'end_startup_chunk_id', 'start_game_chunk_id', 'keyframe_interval', | |
'encryption_key_length' | |
] | |
def __str__(self): | |
return "<RoflPayloadHeader - game ID: {} - game length: {} - " + \ | |
"keyframe count: {} - chunk count: {}>".format( | |
self.game_id, self.game_length, | |
self.keyframe_count, self.chunk_count | |
) | |
class RoflEncryptionKey(Struct): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return '{}s'.format(fileobj.payload_header.encryption_key_length) | |
def __str__(self): | |
return "<RoflEncryptionKey - key as hex: {}>".format( | |
self.encryption_key.encode('hex') | |
) | |
fields = ['encryption_key'] | |
def unpack_encryption_key(self, field_name, field_value, *args, **kwargs): | |
self.encryption_key = field_value.decode('base64') | |
def pack_encryption_key(self, field_name, *args, **kwargs): | |
return self.encryption_key.encode('base64') | |
class RoflChunkHeader(Struct): | |
format = '<IBIII' | |
fields = ['id', 'type', 'length', 'next_chunk_id', 'offset'] | |
def __str__(self): | |
return "<RoflChunkHeader - id: {}, type: {}, length: {}>".format( | |
self.id, 'CHUNK' if self.type == 1 else 'KEYFRAME', self.length | |
) | |
class RoflChunkPayload(Struct): | |
@classmethod | |
def get_format(cls, outer, extradata): | |
return '{}s'.format(extradata) | |
def unpack_chunk(self, field_name, field_value, outer, extradata): | |
self.length = len(field_value) | |
if False: | |
self.chunk = field_value | |
else: | |
crypto = outer.outer.crypto | |
# we need to decrypt this | |
chunk = StringIO(crypto.decrypt(crypto.keyframe_key, field_value)) | |
# now decompress | |
chunkgz = gzip.GzipFile(fileobj=chunk) | |
self.chunk = chunkgz.read() | |
fields = ['chunk'] | |
def __str__(self): | |
return "<RoflChunkPayload - length: {}>".format(self.length) | |
class RoflChunkHeaders(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkHeader] * fileobj.payload_header.chunk_count | |
class RoflKeyframeHeaders(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkHeader] * fileobj.payload_header.keyframe_count | |
class RoflChunks(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkPayload] * ( | |
len(fileobj.chunk_headers.data) + | |
len(fileobj.keyframe_headers.data) | |
) | |
@classmethod | |
def get_extradata(cls, fileobj): | |
out = [] | |
for h in (fileobj.chunk_headers.data + fileobj.keyframe_headers.data): | |
out.append(h.length) | |
return out | |
class RoflCrypto(object): | |
def __init__(self, roflfile): | |
self.file = roflfile | |
def make_crypto(self, key): | |
from Crypto.Cipher import Blowfish | |
return Blowfish.new(key, Blowfish.MODE_ECB) | |
def decrypt(self, key, data): | |
crypto = self.make_crypto(key) | |
datawpadding = crypto.decrypt(data) | |
# how much padding do we need to remove? | |
paddingbytes = ord(datawpadding[-1]) | |
return datawpadding[:-paddingbytes] | |
@property | |
def keyframe_key(self): | |
key = self.decrypt( | |
str(self.file.metadata.json['gameId']), | |
self.file.encryption_key.encryption_key | |
) | |
return key | |
class RoflFile(object): | |
@classmethod | |
def read(cls, fh): | |
self = cls() | |
self.header = RoflHeader.read(fh, self) | |
if self.header.magic != ROFL_MAGIC: | |
raise Exception("Decoding error - magic invalid") | |
self.metadata = RoflMetadata.read(fh, self) | |
self.payload_header = RoflPayloadHeader.read(fh, self) | |
self.encryption_key = RoflEncryptionKey.read(fh, self) | |
self.chunk_headers = RoflChunkHeaders.read(fh, self) | |
self.keyframe_headers = RoflKeyframeHeaders.read(fh, self) | |
self.chunks = RoflChunks.read(fh, self) | |
self.chunk_pairs = zip( | |
self.chunk_headers.data + self.keyframe_headers.data, | |
self.chunks.data | |
) | |
return self | |
@property | |
def crypto(self): | |
return RoflCrypto(self) | |
def __str__(self): | |
return "<RoflFile - header: {} - metadata: {} - payload header: {}" + \ | |
" - encryption key: {}>".format( | |
self.header, self.metadata, | |
self.payload_header, self.encryption_key | |
) | |
def unpack_rofl_to_directory(rofl_file, directory): | |
import os | |
keyframe_dir = os.path.join(directory, 'keyframe') | |
chunk_dir = os.path.join(directory, 'chunk') | |
os.makedirs(keyframe_dir) | |
os.makedirs(chunk_dir) | |
with open(rofl_file, 'rb') as f: | |
roflfile = RoflFile.read(f) | |
for header, chunk in roflfile.chunk_pairs: | |
print header, chunk | |
base_dir = keyframe_dir if header.type == 2 else chunk_dir | |
with open(os.path.join(base_dir, str(header.id)), 'wb') as c: | |
c.write(chunk.chunk) | |
with open(os.path.join(directory, 'meta.json'), 'wb') as f: | |
roflfile.metadata.json['gameKey'] = { | |
'gameId': roflfile.metadata.json['gameId'] | |
} | |
roflfile.metadata.json['key'] = roflfile.encryption_key.\ | |
encryption_key.encode('base64') | |
f.write(roflfile.metadata.as_json()) | |
if __name__ == '__main__': | |
import sys | |
import os.path | |
if len(sys.argv) != 3: | |
print "{} <ROFL file> <directory>".format(sys.argv[0]) | |
sys.exit(0) | |
my_name, rofl_file, output_dir = sys.argv | |
if not os.path.exists(rofl_file): | |
print "ROFL file specified does not exist" | |
sys.exit(1) | |
if os.path.exists(output_dir): | |
print "Output directory exists - won't unpack" | |
sys.exit(1) | |
unpack_rofl_to_directory(rofl_file, output_dir) |
@lukegb @blackjack4494 what changed? Did they completely change the format or just some workable changes which caused the script to not work out of the box? Is there any recent work on extracting the data?
It's been 7 years since I looked at this; I'm pretty sure quite a bit has changed...
I get that I was just wondering if you looked into it since then or know about a project which followed up. Did you manage at the time to find out the meaning of the values in the chunks and keyframes?
I've been looking into this myself. If you don't mind Dominik, could you please update if you find something? I'll do the same if I can get get back into the actual keyframes and chunks
Hey @mlobl. I'm trying to collect all existing knowledge about the replay files here https://github.com/Kungitoo/LolReplayParser2020
If you know of any other projects, efforts or chat groups please post them there as an issue so that I can add it to the list. Right now I'm trying to replicate previous efforts in case they are still using gzip and blowfish because I didn't find anywhere anyone mentioning a different encryption or zip tool. Maybe they just changed the data formats
Great idea @Kungitoo. I was able to get the same box score data and parse all the headers based on the ROFL-Player project into Python. I couldn't get the decryption part right with Blowfish etc. What's also unclear to me is what keyframes and chunks are in the rofl context and how to get the chunks. Maybe it would be obvious if it got decrypted properly. I'll let you know in the repo if I have a breakthrough
I managed to decrypt and decompress using the https://github.com/ryancole/LeagueReplayReader and I'm trying to find the patterns as described in https://github.com/loldevs/leaguespec/wiki/Keyframe-Player-Segment . What seems to be obvious at first is that right at the beginning the keyframe header does not match
"019A167044020000001F0100000000578BB1000103010057B100194D020001DE1BCBCBC ....
so I'm not sure what to expect of the rest of the spec. I did manage to find item codes in the bitstream but I don't think it's correct
I've managed to get in contact with @ryancole who created LeagueReplayReader here ryancole/LeagueReplayReader#1
I was able to get the initial header and even able to parse the keyframe and chunk headers. You can see that some things have changed, but the number of bytes each header takes is the same still (17). The part i'm stuck on is actually decrypting the chunks and keyframes. It might still be blowfish, but following the instructions from years ago hasn't gotten me anywhere. Would be interested if you have a break through. I'll post the code I have shortly from another machine.
@Kungitoo, this is the scratch code I've been playing with so far. Haven't tried to clean it up, but should be still be clear enough if you're trying
import json
import blowfish
import gzip
import base64
from Crypto.Util.py3compat import *
def unpad(padded_data, block_size, style='pkcs7'):
"""Remove standard padding.
:Parameters:
padded_data : byte string
A piece of data with padding that needs to be stripped.
block_size : integer
The block boundary to use for padding. The input length
must be a multiple of ``block_size``.
style : string
Padding algorithm. It can be *'pkcs7'* (default), *'iso7816'* or *'x923'*.
:Return:
Data without padding.
:Raises ValueError:
if the padding is incorrect.
"""
pdata_len = len(padded_data)
if pdata_len % block_size:
raise ValueError("Input data is not padded")
if style in ('pkcs7', 'x923'):
padding_len = bord(padded_data[-1])
if padding_len<1 or padding_len>min(block_size, pdata_len):
raise ValueError("Padding is incorrect.")
if style == 'pkcs7':
if padded_data[-padding_len:]!=bchr(padding_len)*padding_len:
raise ValueError("PKCS#7 padding is incorrect.")
else:
if padded_data[-padding_len:-1]!=bchr(0)*(padding_len-1):
raise ValueError("ANSI X.923 padding is incorrect.")
elif style == 'iso7816':
padding_len = pdata_len - padded_data.rfind(bchr(128))
if padding_len<1 or padding_len>min(block_size, pdata_len):
raise ValueError("Padding is incorrect.")
if padding_len>1 and padded_data[1-padding_len:]!=bchr(0)*(padding_len-1):
raise ValueError("ISO 7816-4 padding is incorrect.")
else:
raise ValueError("Unknown padding style")
return padded_data[:-padding_len]
f = open('/path/to/file.rofl', 'rb')
f.read(262)
header_data = f.read(26)
header_length = int.from_bytes(header_data[:2], 'little')
file_length = int.from_bytes(header_data[2:6], 'little')
metadata_offset = int.from_bytes(header_data[6:10], 'little')
metadata_length = int.from_bytes(header_data[10:14], 'little')
payload_header_offset = int.from_bytes(header_data[14:18], 'little')
payload_header_length = int.from_bytes(header_data[18:22], 'little')
payload_offset = int.from_bytes(header_data[22:26], 'little')
metadata_bin = f.read(metadata_length)
metadata = json.loads(metadata_bin.decode())
game_length = metadata['gameLength']
game_version = metadata['gameVersion']
last_game_chunk_id = metadata['lastGameChunkId']
last_key_frame_id = metadata['lastKeyFrameId']
player_stats = json.loads(metadata['statsJson'])
payload_bin = f.read(payload_header_length)
match_id = int.from_bytes(payload_bin[:8], 'little')
match_length = int.from_bytes(payload_bin[8:12], 'little')
keyframe_amount = int.from_bytes(payload_bin[12:16], 'little')
chunk_amount = int.from_bytes(payload_bin[16:20], 'little')
end_chunk_id = int.from_bytes(payload_bin[20:24], 'little')
start_chunk_id = int.from_bytes(payload_bin[24:28], 'little')
keyframe_interval = int.from_bytes(payload_bin[28:32], 'little')
encryption_key_length = int.from_bytes(payload_bin[32:34], 'little')
encryption_key = payload_bin[34: 34+encryption_key_length]
b64_encode_key = base64.b64encode(encryption_key)
cipher = blowfish.Cipher(match_id.to_bytes(8, byteorder='little'))
f = open('/path/to/file.rofl', 'rb')
f.read(payload_offset)
payload_bin = f.read()
# reading chunk headers
chunk_headers = []
for i in range(chunk_amount+keyframe_amount):
base = 17 * i
chunk_id = int.from_bytes(payload_bin[base: base + 4], 'little')
is_chunk = int.from_bytes(payload_bin[base + 4:base + 5], 'little') # 1 is chunk, 2 is keyframe
chunk_size = int.from_bytes(payload_bin[base + 5:base + 9], 'little')
next_chunk_id = int.from_bytes(payload_bin[base + 9:base + 13], 'little')
chunk_offset = int.from_bytes(payload_bin[base + 13:base + 17], 'little')
chunk_headers.append([chunk_id, is_chunk, chunk_size, next_chunk_id, chunk_offset])
#print(chunk_headers[-1])
I'll be able to get the chunk/keyframe offsets from that, but I'm just looking at a bunch of encrypted hex when I do.
I'm using the https://github.com/ryancole/LeagueReplayReader to decrypt and unpack keyframes and chunks and I think that it's still working on the latest replays. I'm currently trying to split a keyframe into blocks as specified in https://github.com/loldevs/leaguespec/wiki/General-Binary-Format
Is the rofl format still the same?