-
-
Save lukegb/d2997a5fc7970ce6e1e1 to your computer and use it in GitHub Desktop.
Test code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import json | |
import copy | |
import gzip | |
try: | |
from cStringIO import StringIO | |
except: | |
from StringIO import StringIO | |
ROFL_MAGIC = 'RIOT' + chr(0) * 2 | |
FILENAME = '20130523_0935_HA_PBE1_26967171.rofl' | |
FILENAME2 = '20130523_0702_HA_PBE1_26961185.rofl' | |
# With thanks to https://github.com/robertabcd/lol-ob | |
class Struct(object): | |
format = None | |
extradata = None | |
@classmethod | |
def get_extradata(cls, fileobj): | |
return [None] * len(cls.get_format(fileobj, None)) | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return cls.format | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
format = cls.get_format(fileobj, extradata=extradata) | |
f_str = fh.read(struct.calcsize(format)) | |
res = struct.unpack(format, f_str) | |
me = cls() | |
me.unpack_tuple(res, fileobj, extradata) | |
return me | |
def unpack_tuple(self, res, fileobj, extradata): | |
for field_name, field_value in zip(self.fields, res): | |
custom_func = getattr(self, "unpack_{}".format(field_name), None) | |
if custom_func is not None: | |
custom_func(field_name, field_value, fileobj, extradata) | |
else: | |
setattr(self, field_name, field_value) | |
def pack_tuple(self, fileobj, extradata): | |
out = [] | |
for field_name in self.fields: | |
custom_func = getattr(self, "pack_{}".format(field_name), None) | |
if custom_func is not None: | |
val = custom_func(field_name, fileobj, extradata) | |
else: | |
val = getattr(self, field_name) | |
out.push(val) | |
return out | |
def write(self, fh, fileobj, extradata=None): | |
out_tuple = self.pack_tuple(fileobj, extradata) | |
fh.write(struct.pack(self.get_format(fileobj, extradata), *out_tuple)) | |
class CompositeStruct(Struct): | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
self = cls() | |
for clazz, field in zip(cls.get_format(fileobj), cls.fields): | |
setattr(self, field, clazz.read(fh, self, extradata=extradata)) | |
return self | |
class CompositeStructList(Struct): | |
@classmethod | |
def read(cls, fh, fileobj, extradata=None): | |
self = cls() | |
self.outer = fileobj | |
self.data = [] | |
for clazz, ed in zip( | |
cls.get_format(fileobj, extradata=extradata), | |
cls.get_extradata(fileobj) | |
): | |
self.data.append(clazz.read(fh, self, extradata=ed)) | |
return self | |
def __str__(self): | |
return "[{}]".format(', '.join((str(s) for s in self.data))) | |
class RoflHeader(Struct): | |
format = '6s256sHIIIIII' | |
fields = [ | |
'magic', 'signature', 'header_len', 'file_len', | |
'metadata_offset', 'metadata_len', 'payload_header_offset', | |
'payload_header_len', 'payload_offset' | |
] | |
def __str__(self): | |
return "<RoflHeader - magic: {}>".format(self.magic) | |
class RoflMetadata(Struct): | |
fields = ['json'] | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return "{}s".format(fileobj.header.metadata_len) | |
def unpack_json(self, field_name, field_value, fileobj, extradata): | |
self.json = json.loads(field_value) | |
self.json['stats'] = json.loads(self.json['statsJSON']) | |
del self.json['statsJSON'] | |
def pack_json(self, field_name, fileobj, extradata): | |
tmpj = copy.deepcopy(self.json) | |
tmpj['statsJSON'] = json.dumps(tmpj['stats']) | |
del tmpj['stats'] | |
return json.dumps(tmpj) | |
def as_json(self): | |
return json.dumps(self.json, indent=4) | |
def __str__(self): | |
return "<RoflMetadata>" | |
class RoflPayloadHeader(Struct): | |
format = 'QIIIIIIH' | |
fields = [ | |
'game_id', 'game_length', 'keyframe_count', 'chunk_count', | |
'end_startup_chunk_id', 'start_game_chunk_id', 'keyframe_interval', | |
'encryption_key_length' | |
] | |
def __str__(self): | |
return "<RoflPayloadHeader - game ID: {} - game length: {} - " + \ | |
"keyframe count: {} - chunk count: {}>".format( | |
self.game_id, self.game_length, | |
self.keyframe_count, self.chunk_count | |
) | |
class RoflEncryptionKey(Struct): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return '{}s'.format(fileobj.payload_header.encryption_key_length) | |
def __str__(self): | |
return "<RoflEncryptionKey - key as hex: {}>".format( | |
self.encryption_key.encode('hex') | |
) | |
fields = ['encryption_key'] | |
def unpack_encryption_key(self, field_name, field_value, *args, **kwargs): | |
self.encryption_key = field_value.decode('base64') | |
def pack_encryption_key(self, field_name, *args, **kwargs): | |
return self.encryption_key.encode('base64') | |
class RoflChunkHeader(Struct): | |
format = '<IBIII' | |
fields = ['id', 'type', 'length', 'next_chunk_id', 'offset'] | |
def __str__(self): | |
return "<RoflChunkHeader - id: {}, type: {}, length: {}>".format( | |
self.id, 'CHUNK' if self.type == 1 else 'KEYFRAME', self.length | |
) | |
class RoflChunkPayload(Struct): | |
@classmethod | |
def get_format(cls, outer, extradata): | |
return '{}s'.format(extradata) | |
def unpack_chunk(self, field_name, field_value, outer, extradata): | |
self.length = len(field_value) | |
if False: | |
self.chunk = field_value | |
else: | |
crypto = outer.outer.crypto | |
# we need to decrypt this | |
chunk = StringIO(crypto.decrypt(crypto.keyframe_key, field_value)) | |
# now decompress | |
chunkgz = gzip.GzipFile(fileobj=chunk) | |
self.chunk = chunkgz.read() | |
fields = ['chunk'] | |
def __str__(self): | |
return "<RoflChunkPayload - length: {}>".format(self.length) | |
class RoflChunkHeaders(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkHeader] * fileobj.payload_header.chunk_count | |
class RoflKeyframeHeaders(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkHeader] * fileobj.payload_header.keyframe_count | |
class RoflChunks(CompositeStructList): | |
@classmethod | |
def get_format(cls, fileobj, extradata): | |
return [RoflChunkPayload] * ( | |
len(fileobj.chunk_headers.data) + | |
len(fileobj.keyframe_headers.data) | |
) | |
@classmethod | |
def get_extradata(cls, fileobj): | |
out = [] | |
for h in (fileobj.chunk_headers.data + fileobj.keyframe_headers.data): | |
out.append(h.length) | |
return out | |
class RoflCrypto(object): | |
def __init__(self, roflfile): | |
self.file = roflfile | |
def make_crypto(self, key): | |
from Crypto.Cipher import Blowfish | |
return Blowfish.new(key, Blowfish.MODE_ECB) | |
def decrypt(self, key, data): | |
crypto = self.make_crypto(key) | |
datawpadding = crypto.decrypt(data) | |
# how much padding do we need to remove? | |
paddingbytes = ord(datawpadding[-1]) | |
return datawpadding[:-paddingbytes] | |
@property | |
def keyframe_key(self): | |
key = self.decrypt( | |
str(self.file.metadata.json['gameId']), | |
self.file.encryption_key.encryption_key | |
) | |
return key | |
class RoflFile(object): | |
@classmethod | |
def read(cls, fh): | |
self = cls() | |
self.header = RoflHeader.read(fh, self) | |
if self.header.magic != ROFL_MAGIC: | |
raise Exception("Decoding error - magic invalid") | |
self.metadata = RoflMetadata.read(fh, self) | |
self.payload_header = RoflPayloadHeader.read(fh, self) | |
self.encryption_key = RoflEncryptionKey.read(fh, self) | |
self.chunk_headers = RoflChunkHeaders.read(fh, self) | |
self.keyframe_headers = RoflKeyframeHeaders.read(fh, self) | |
self.chunks = RoflChunks.read(fh, self) | |
self.chunk_pairs = zip( | |
self.chunk_headers.data + self.keyframe_headers.data, | |
self.chunks.data | |
) | |
return self | |
@property | |
def crypto(self): | |
return RoflCrypto(self) | |
def __str__(self): | |
return "<RoflFile - header: {} - metadata: {} - payload header: {}" + \ | |
" - encryption key: {}>".format( | |
self.header, self.metadata, | |
self.payload_header, self.encryption_key | |
) | |
def unpack_rofl_to_directory(rofl_file, directory): | |
import os | |
keyframe_dir = os.path.join(directory, 'keyframe') | |
chunk_dir = os.path.join(directory, 'chunk') | |
os.makedirs(keyframe_dir) | |
os.makedirs(chunk_dir) | |
with open(rofl_file, 'rb') as f: | |
roflfile = RoflFile.read(f) | |
for header, chunk in roflfile.chunk_pairs: | |
print header, chunk | |
base_dir = keyframe_dir if header.type == 2 else chunk_dir | |
with open(os.path.join(base_dir, str(header.id)), 'wb') as c: | |
c.write(chunk.chunk) | |
with open(os.path.join(directory, 'meta.json'), 'wb') as f: | |
roflfile.metadata.json['gameKey'] = { | |
'gameId': roflfile.metadata.json['gameId'] | |
} | |
roflfile.metadata.json['key'] = roflfile.encryption_key.\ | |
encryption_key.encode('base64') | |
f.write(roflfile.metadata.as_json()) | |
if __name__ == '__main__': | |
import sys | |
import os.path | |
if len(sys.argv) != 3: | |
print "{} <ROFL file> <directory>".format(sys.argv[0]) | |
sys.exit(0) | |
my_name, rofl_file, output_dir = sys.argv | |
if not os.path.exists(rofl_file): | |
print "ROFL file specified does not exist" | |
sys.exit(1) | |
if os.path.exists(output_dir): | |
print "Output directory exists - won't unpack" | |
sys.exit(1) | |
unpack_rofl_to_directory(rofl_file, output_dir) |
@Kungitoo, this is the scratch code I've been playing with so far. Haven't tried to clean it up, but should be still be clear enough if you're trying
import json
import blowfish
import gzip
import base64
from Crypto.Util.py3compat import *
def unpad(padded_data, block_size, style='pkcs7'):
"""Remove standard padding.
:Parameters:
padded_data : byte string
A piece of data with padding that needs to be stripped.
block_size : integer
The block boundary to use for padding. The input length
must be a multiple of ``block_size``.
style : string
Padding algorithm. It can be *'pkcs7'* (default), *'iso7816'* or *'x923'*.
:Return:
Data without padding.
:Raises ValueError:
if the padding is incorrect.
"""
pdata_len = len(padded_data)
if pdata_len % block_size:
raise ValueError("Input data is not padded")
if style in ('pkcs7', 'x923'):
padding_len = bord(padded_data[-1])
if padding_len<1 or padding_len>min(block_size, pdata_len):
raise ValueError("Padding is incorrect.")
if style == 'pkcs7':
if padded_data[-padding_len:]!=bchr(padding_len)*padding_len:
raise ValueError("PKCS#7 padding is incorrect.")
else:
if padded_data[-padding_len:-1]!=bchr(0)*(padding_len-1):
raise ValueError("ANSI X.923 padding is incorrect.")
elif style == 'iso7816':
padding_len = pdata_len - padded_data.rfind(bchr(128))
if padding_len<1 or padding_len>min(block_size, pdata_len):
raise ValueError("Padding is incorrect.")
if padding_len>1 and padded_data[1-padding_len:]!=bchr(0)*(padding_len-1):
raise ValueError("ISO 7816-4 padding is incorrect.")
else:
raise ValueError("Unknown padding style")
return padded_data[:-padding_len]
f = open('/path/to/file.rofl', 'rb')
f.read(262)
header_data = f.read(26)
header_length = int.from_bytes(header_data[:2], 'little')
file_length = int.from_bytes(header_data[2:6], 'little')
metadata_offset = int.from_bytes(header_data[6:10], 'little')
metadata_length = int.from_bytes(header_data[10:14], 'little')
payload_header_offset = int.from_bytes(header_data[14:18], 'little')
payload_header_length = int.from_bytes(header_data[18:22], 'little')
payload_offset = int.from_bytes(header_data[22:26], 'little')
metadata_bin = f.read(metadata_length)
metadata = json.loads(metadata_bin.decode())
game_length = metadata['gameLength']
game_version = metadata['gameVersion']
last_game_chunk_id = metadata['lastGameChunkId']
last_key_frame_id = metadata['lastKeyFrameId']
player_stats = json.loads(metadata['statsJson'])
payload_bin = f.read(payload_header_length)
match_id = int.from_bytes(payload_bin[:8], 'little')
match_length = int.from_bytes(payload_bin[8:12], 'little')
keyframe_amount = int.from_bytes(payload_bin[12:16], 'little')
chunk_amount = int.from_bytes(payload_bin[16:20], 'little')
end_chunk_id = int.from_bytes(payload_bin[20:24], 'little')
start_chunk_id = int.from_bytes(payload_bin[24:28], 'little')
keyframe_interval = int.from_bytes(payload_bin[28:32], 'little')
encryption_key_length = int.from_bytes(payload_bin[32:34], 'little')
encryption_key = payload_bin[34: 34+encryption_key_length]
b64_encode_key = base64.b64encode(encryption_key)
cipher = blowfish.Cipher(match_id.to_bytes(8, byteorder='little'))
f = open('/path/to/file.rofl', 'rb')
f.read(payload_offset)
payload_bin = f.read()
# reading chunk headers
chunk_headers = []
for i in range(chunk_amount+keyframe_amount):
base = 17 * i
chunk_id = int.from_bytes(payload_bin[base: base + 4], 'little')
is_chunk = int.from_bytes(payload_bin[base + 4:base + 5], 'little') # 1 is chunk, 2 is keyframe
chunk_size = int.from_bytes(payload_bin[base + 5:base + 9], 'little')
next_chunk_id = int.from_bytes(payload_bin[base + 9:base + 13], 'little')
chunk_offset = int.from_bytes(payload_bin[base + 13:base + 17], 'little')
chunk_headers.append([chunk_id, is_chunk, chunk_size, next_chunk_id, chunk_offset])
#print(chunk_headers[-1])
I'll be able to get the chunk/keyframe offsets from that, but I'm just looking at a bunch of encrypted hex when I do.
I'm using the https://github.com/ryancole/LeagueReplayReader to decrypt and unpack keyframes and chunks and I think that it's still working on the latest replays. I'm currently trying to split a keyframe into blocks as specified in https://github.com/loldevs/leaguespec/wiki/General-Binary-Format
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I was able to get the initial header and even able to parse the keyframe and chunk headers. You can see that some things have changed, but the number of bytes each header takes is the same still (17). The part i'm stuck on is actually decrypting the chunks and keyframes. It might still be blowfish, but following the instructions from years ago hasn't gotten me anywhere. Would be interested if you have a break through. I'll post the code I have shortly from another machine.