Created
April 17, 2025 23:24
-
-
Save uyjulian/6056e92c4ca92778c49bebd54bd84707 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-License-Identifier: MIT | |
import sys | |
import io | |
def read_items(in_bytes=None, in_items=None, in_dic=None, in_byteorder="little"): | |
total_offset = 0 | |
for x in in_items: | |
item_name = x[0] | |
item_size = x[1] | |
item_element_size = x[2] if len(x) > 2 else None | |
if in_bytes != None: | |
item_bytes = in_bytes[total_offset:total_offset + item_size] | |
if len(item_bytes) != item_size: | |
break | |
item_data = item_bytes | |
if item_element_size != None: | |
item_arr = [] | |
for i in range(item_size // item_element_size): | |
item_arr.append(int.from_bytes(item_bytes[i * item_element_size:(i + 1) * item_element_size], byteorder=in_byteorder)) | |
if len(item_arr) == 1: | |
item_data = item_arr[0] | |
else: | |
item_data = item_arr | |
if in_dic != None: | |
in_dic[item_name] = item_data | |
total_offset += item_size | |
return total_offset | |
hdr_items = [ | |
["magic", 28], | |
["version", 12], | |
["page_size", 2, 2], | |
["pages_per_cluster", 2, 2], | |
["pages_per_block", 2, 2], | |
["unk0", 2], | |
["clusters_per_card", 4, 4], | |
["alloc_offset", 4, 4], | |
["alloc_end", 4, 4], | |
["root_dir_cluster", 4, 4], | |
["backup_block_1", 4, 4], | |
["backup_block_2", 4, 4], | |
["unk1", 8], | |
["ifc_list", 128, 4], | |
["bad_block_list", 128, 4], | |
["card_type", 1, 1], | |
["card_flags", 1, 1], | |
["pad2", 2], | |
] | |
dentry_items = [ | |
["mode", 2, 2], | |
["unk0", 2], | |
["length", 4, 4], | |
["ctime", 8, 1], | |
["cluster", 4, 4], | |
["dentry", 4, 4], | |
["mtime", 8, 1], | |
["attributes", 4, 4], | |
["unk1", 28], | |
["name", 32], | |
["pad2", 416], | |
] | |
DF_FILE = 0x10 | |
DF_DIRECTORY = 0x20 | |
DF_EXISTS = 0x8000 | |
MC_PAGE_SIZE = 0x200 | |
MC_PAGES_PER_CLUSTER = 2 | |
MC_CLUSTER_SIZE = MC_PAGE_SIZE * MC_PAGES_PER_CLUSTER | |
INVALID_CLUSTER_IDX = 0xffffffff | |
class TMcDumpReader: | |
def __init__(self, in_stream): | |
self.m_cluster_cache = {} | |
self.m_card_stream = in_stream | |
hdr = {} | |
self.m_header = hdr | |
hdr_size = read_items(in_stream.read(340), hdr_items, hdr) | |
if hdr["page_size"] != MC_PAGE_SIZE: | |
raise Exception("unexpected page size") | |
if hdr["pages_per_cluster"] != MC_PAGES_PER_CLUSTER: | |
raise Exception("unexpected pages per cluster") | |
total_page_count = hdr["pages_per_cluster"] * hdr["clusters_per_card"] | |
expected_size = hdr["page_size"] * total_page_count | |
in_stream.seek(0, io.SEEK_END) | |
actual_size = in_stream.tell() | |
if actual_size < expected_size: | |
raise Exception("actual size lower than expected") | |
page_spare_size = (actual_size - expected_size) // total_page_count | |
in_stream.seek(524, io.SEEK_SET) | |
spare_area_check = int.from_bytes(in_stream.read(4), byteorder="little") | |
if spare_area_check == 0: | |
page_spare_size = 0x10 | |
self.m_raw_page_size = hdr["page_size"] + page_spare_size | |
def read_directory(self, dir_cluster, entry_count=-1): | |
entries = [] | |
reader = TFatReader(self, dir_cluster) | |
dir_entry_bytes = read_items(None, dentry_items, None) | |
tmp_dir_entry_bytes = bytearray(dir_entry_bytes) | |
read_amount = reader.read(tmp_dir_entry_bytes, dir_entry_bytes) | |
if read_amount != dir_entry_bytes: | |
raise Exception("invalid") | |
base_dir_entry = {} | |
read_items(tmp_dir_entry_bytes, dentry_items, base_dir_entry) | |
entries.append(base_dir_entry) | |
if entry_count == -1: | |
entry_count = base_dir_entry["length"] | |
if entry_count < 2: | |
raise Exception("invalid entry count") | |
for i in range(entry_count - 1): | |
read_amount = reader.read(tmp_dir_entry_bytes, dir_entry_bytes) | |
if read_amount != dir_entry_bytes: | |
raise Exception("invalid") | |
dir_entry = {} | |
read_items(tmp_dir_entry_bytes, dentry_items, dir_entry) | |
entries.append(dir_entry) | |
return entries | |
def read_root_directory(self): | |
return self.read_directory(self.m_header["root_dir_cluster"]) | |
def read_file(self, file_cluster, file_size): | |
reader = TFatReader(self, file_cluster) | |
result = bytearray(file_size) | |
read_amount = reader.read(result, file_size) | |
if read_amount != file_size: | |
raise Exception("invalud") | |
return result | |
def read_cluster(self, cluster_index, out_buffer): | |
page_index = cluster_index * self.m_header["pages_per_cluster"] | |
if self.m_raw_page_size < self.m_header["page_size"]: | |
raise Exception("invalid") | |
spare_size = self.m_raw_page_size - self.m_header["page_size"] | |
self.m_card_stream.seek(page_index * self.m_raw_page_size) | |
for i in range(self.m_header["pages_per_cluster"]): | |
out_buffer[i * self.m_header["page_size"]:(i + 1) * self.m_header["page_size"]] = self.m_card_stream.read(self.m_header["page_size"]) | |
self.m_card_stream.seek(spare_size, io.SEEK_CUR) | |
def read_cluster_cached(self, cluster_index, out_buffer): | |
cluster_size = self.m_header["pages_per_cluster"] * self.m_header["page_size"] | |
if cluster_index not in self.m_cluster_cache: | |
new_data = bytearray(cluster_size) | |
self.read_cluster(cluster_index, new_data) | |
self.m_cluster_cache[cluster_index] = new_data | |
out_buffer[:] = self.m_cluster_cache[cluster_index][:] | |
class TFatReader: | |
def __init__(self, parent, cluster): | |
self.m_cluster = 0 | |
self.m_buffer_index = 0 | |
self.m_buffer = bytearray(MC_CLUSTER_SIZE) | |
self.m_parent = parent | |
self.read_fat_cluster(cluster) | |
def read(self, out_buffer, size): | |
if self.m_cluster == INVALID_CLUSTER_IDX: | |
return 0 | |
amount_read = 0 | |
buffer_offset = 0 | |
cur_size = size | |
while cur_size != 0: | |
if self.m_buffer_index == MC_CLUSTER_SIZE: | |
entry = self.get_next_fat_cluster_entry(self.m_cluster) | |
next_valid_bit = (1 << 31) | |
if (entry & next_valid_bit) != 0: | |
self.read_fat_cluster(entry & (next_valid_bit ^ 0xFFFFFFFF)) | |
else: | |
self.m_cluster = INVALID_CLUSTER_IDX | |
break | |
if self.m_buffer_index >= MC_CLUSTER_SIZE: | |
raise Exception("invalid") | |
amount_avail = MC_CLUSTER_SIZE - self.m_buffer_index | |
to_read = min(amount_avail, cur_size) | |
out_buffer[buffer_offset:buffer_offset + to_read] = self.m_buffer[self.m_buffer_index:self.m_buffer_index + to_read] | |
self.m_buffer_index += to_read | |
buffer_offset += to_read | |
amount_read += to_read | |
cur_size -= to_read | |
return amount_read | |
def read_fat_cluster(self, cluster_index): | |
self.m_cluster = cluster_index | |
self.m_parent.read_cluster(cluster_index + self.m_parent.m_header["alloc_offset"], self.m_buffer) | |
self.m_buffer_index = 0 | |
def get_next_fat_cluster_entry(self, cluster_index): | |
cluster_tmp = bytearray(MC_CLUSTER_SIZE) | |
fat_offset = cluster_index & 0xFF | |
indirect_index = cluster_index // 256 | |
indirect_offset = indirect_index & 0xFF | |
dbl_indirect_index = indirect_index // 256 | |
indirect_cluster_num = self.m_parent.m_header["ifc_list"][dbl_indirect_index] | |
self.m_parent.read_cluster_cached(indirect_cluster_num, cluster_tmp) | |
fat_cluster_num = int.from_bytes(cluster_tmp[indirect_offset * 4:(indirect_offset + 1) * 4], byteorder="little") | |
self.m_parent.read_cluster_cached(fat_cluster_num, cluster_tmp) | |
return int.from_bytes(cluster_tmp[fat_offset * 4:(fat_offset + 1) * 4], byteorder="little") | |
def read_directory(mc_reader, mc_dentry, write_file_callback, write_file_basepath): | |
for entry in mc_dentry: | |
name_trimmed = entry["name"].rstrip(b"\x00") | |
if name_trimmed in [b".", b".."]: | |
continue | |
if (entry["mode"] & DF_EXISTS) == 0: | |
continue | |
if (entry["mode"] & DF_DIRECTORY) != 0: | |
sub_mc_dentry = mc_reader.read_directory(entry["cluster"], entry["length"]) | |
read_directory(mc_reader, sub_mc_dentry, write_file_callback, write_file_basepath + b"/" + name_trimmed) | |
else: | |
file_contents = mc_reader.read_file(entry["cluster"], entry["length"]) | |
if len(file_contents) != entry["length"]: | |
raise Exception("invalid") | |
ctime = entry["ctime"] | |
year = ctime[6] | (ctime[7] << 8) | |
month = ctime[5] | |
if month == 0: | |
month = 1 | |
day = ctime[4] | |
cur_metadata = {} | |
cur_metadata["date"] = "%04d/%02d/%02d" % (year, month, day) | |
write_file_callback(bytes(name_trimmed), file_contents, cur_metadata) | |
def read_mcfs(in_bytes, write_file_callback): | |
file_metadata = {} | |
file_metadata["file_ext"] = "mcd" | |
write_file_callback(None, None, file_metadata) | |
dmp = TMcDumpReader(io.BytesIO(in_bytes)) | |
read_directory(dmp, dmp.read_root_directory(), write_file_callback, b".") | |
def identify_mcfs(in_bytes, in_bytes_len): | |
return in_bytes[0:28] == b"Sony PS2 Memory Card Format " | |
if __name__ == "__main__": | |
infile_bytes = b"" | |
with open(sys.argv[1], "rb") as f: | |
infile_bytes = f.read() | |
def write_file_cb(name, file_bytes, file_metadata): | |
print(name, len(file_bytes), file_metadata) | |
read_mcfs(infile_bytes, write_file_cb) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment