Created
May 10, 2017 14:06
-
-
Save javiercantero/e1042ca2cbb072599c98028c207689fe to your computer and use it in GitHub Desktop.
Tool to unpack Ascalon games (such as Patrician III) .CPR files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: UTF-8 -*- | |
# | |
# CPRreader - reads and unpacks .CPR files from Patrician Ascalon games | |
# | |
# Copyright (C) 2014, 2017 Javier Cantero | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
# ---------------------------------------------------------------------------- | |
# Available documentation about .CPR file format | |
# ---------------------------------------------------------------------------- | |
# info from: http://wiki.xentax.com/index.php?title=Patrician | |
# | |
# CPR format (Patrician Ascaron games) | |
# Type: archive | |
# byte order: little endian | |
# | |
# Format Specifications | |
# | |
# char {16} - Header (ASCARON_ARCHIVE ) | |
# char {4} - Version (V0.9) | |
# byte {12} - null | |
# uint32 {4} - Directory Length [-16] (including all these archive | |
# header fields and the 9 null padding) | |
# uint32 {4} - Directory Length [-15] (not including all these archive | |
# header fields or the padding) | |
# uint32 {4} - Number Of Files | |
# uint32 {4} - Next Directory (relative offset from here) | |
# | |
# // for each file | |
# | |
# uint32 {4} - Data Offset | |
# uint32 {4} - Raw File Length | |
# uint32 {4} - Unknown (1) | |
# char {X} - Filename | |
# byte {1} - null Filename Terminator | |
# | |
# | |
# byte {9} - null | |
# byte {X} - File Data | |
# | |
# ---------------------------------------------------------------------------- | |
# CPRID: 32 bytes "ASCARON_ARCHIVE V0.9" Right padded with 0x00 | |
# CPRHeader: | |
# SubID: int | |
# Size: int | |
# NumE: int | |
# NextRelOffset: int | |
# CPREntry: | |
# Offset: int | |
# Size: int | |
# ???: int | |
# Filename: null terminated string | |
# ---------------------------------------------------------------------------- | |
import sys | |
import getopt | |
import os | |
import os.path | |
import struct | |
DEBUG = False | |
DEBUG2 = False | |
ASCARON_ARCHIVE_ID = b'ASCARON_ARCHIVE V0.9' | |
ASCARON_ARCHIVE_ID_SIZE = 32 | |
INDEX_HEADER_SIZE = 4 * 4 # four ints | |
class AppException( Exception ): | |
pass | |
def get_index_entries( index_data ): | |
""" get the info of the stored files from the index data """ | |
entries = [] # entries found | |
index_data_len = len( index_data ) | |
if DEBUG2: print( "### DIRECTORY (index_data_len={})".format(index_data_len) ) | |
offset = 0 | |
while offset + 13 < index_data_len: | |
if DEBUG2: print( " {0:04}".format(offset), end='' ) | |
# read entry info | |
entry_offset, entry_len, _unknown = struct.unpack_from( '<III', index_data, offset ) | |
offset += 12 | |
offset_end_filename = index_data.find( b'\0', offset ) | |
if offset_end_filename == -1: | |
raise AppException( "unexpected end of index: filename truncated" ) | |
ascii_filename = index_data[offset:offset_end_filename] | |
try: | |
entry_filename = ascii_filename.decode("iso-8859-1") | |
except UnicodeDecodeError: | |
raise AppException( "filename not in ISO-8859-1 format" ) | |
entries.append( (entry_filename, entry_offset, entry_len) ) | |
offset = offset_end_filename + 1 # next entry | |
if DEBUG2: print( ":{0:04} {1}".format(offset - 1, entries[-1]) ) | |
return entries | |
def unpack_files( fi, file_index, unpack_dir, verbose, override ): | |
""" copy file data from the index into separate files in the filesystem """ | |
for filename, offset, size in file_index: | |
# 1.- create new output file with the appropiate name and path | |
# use native directory separators | |
filename = os.path.join( *filename.split( '\\' ) ) | |
unpack_filename = os.path.join( unpack_dir, filename ) | |
# create parent directories | |
unpack_dirpath = os.path.dirname( unpack_filename ) | |
if not os.path.exists( unpack_dirpath ): | |
os.makedirs( unpack_dirpath ) | |
# 2.- copy the content to the new file | |
if verbose: print( "Unpacking {0}... ".format(filename), end='' ) | |
if not os.path.exists( unpack_filename ) or override: | |
with open( unpack_filename, "wb" ) as fo: | |
fi.seek( offset, 0 ) | |
buff = fi.read( size ) | |
fo.write( buff ) | |
if verbose: print( "done!" ) | |
else: | |
if verbose: print( "error: file already exists" ) | |
def parse_cpr_file( f ): | |
""" Read and process the .CPR file """ | |
# 1. read all the index headers in one pass | |
# ------------------------------------------ | |
index_headers = [] # headers found | |
index_header_pos = f.tell() | |
if DEBUG: | |
print( """Index headers found: | |
position idx. size dir. size n. files next index | |
---------- --------- --------- -------- ----------""" ) | |
while True: | |
try: | |
index_header = f.read( INDEX_HEADER_SIZE ) | |
if len(index_header) == 0: # end of file - no more headers | |
break | |
if len(index_header) < INDEX_HEADER_SIZE: | |
raise AppException( "unexpected EOF" ) | |
# read header info | |
index_size, dir_size, num_files, next_index_header_relpos = struct.unpack_from( '<4I', index_header ) | |
if DEBUG: | |
print( "{0:10d} {1:9d} {2:9d} {3:8d} {4:10d}".format( | |
f.tell(), index_size, dir_size, num_files, next_index_header_relpos) ) | |
# index header checks | |
if dir_size > index_size: | |
raise AppException( "inconsistent index and directory size" ) | |
if num_files * ( 12 + 1 ) > dir_size: | |
raise AppException( "not enough size to store {0} files".format(num_files) ) | |
# TODO: other checks? | |
index_headers.append( (index_header_pos + INDEX_HEADER_SIZE, index_size, dir_size, num_files) ) | |
index_header_pos += index_size + next_index_header_relpos | |
f.seek( index_header_pos, 0 ) # move to next header pos. | |
# catch-all AppException() to add index header pos. info | |
except AppException as err: | |
raise AppException( "Index header @{0}: {1}".format(index_header_pos, err.args[0]) ) | |
# 2. build the file index from all the index headers | |
# --------------------------------------------------- | |
index = [] | |
for index_pos, _, dir_size, num_files in index_headers: | |
try: | |
f.seek( index_pos, 0 ) | |
dir_size -= INDEX_HEADER_SIZE | |
index_data = f.read( dir_size ) | |
if len( index_data ) < dir_size: | |
raise AppException( "unexpected EOF" ) | |
try: | |
index_entries = get_index_entries( index_data ) | |
except AppException as err: | |
raise AppException( "corrupted index block ({0})".format(err.args[0]) ) | |
# check number of index entries | |
if len(index_entries) != num_files: | |
raise AppException( "file count mismatch: {0} entries instead of {1}".format( | |
len(index_entries), num_files) ) | |
index += index_entries | |
# catch-all AppException() to add index pos. info | |
except AppException as err: | |
raise AppException( "Index block @{0}: {1}".format(index_pos, err.args[0]) ) | |
return index | |
def usage(): | |
""" print usage """ | |
print( "usage: {0} [--verbose] [--no-unpack] [--override] [--directory=<dir>] <file>".format( | |
sys.argv[0]), file=sys.stdout ) | |
def main(): | |
""" main function """ | |
# parsing command line options | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "hvnod:", ["help", | |
"verbose", "no-unpack", "override", "directory=" ]) | |
except getopt.GetoptError as err: | |
print( str(err), file=sys.stderr ) | |
usage() | |
sys.exit(2) | |
if len(args) != 1: | |
usage() | |
sys.exit(2) | |
# default option values | |
verbose = False | |
override = False | |
unpack = True | |
unpack_dir = '' | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
usage() | |
sys.exit() | |
elif o in ("-v", "--verbose"): | |
verbose = True | |
elif o in ("-n", "--no-unpack"): | |
unpack = False | |
elif o in ("-o", "--override"): | |
override = True | |
elif o in ("-d", "--directory"): | |
unpack_dir = a | |
else: | |
print( "Unknown option: {0}{1}".format(o, a), file=sys.stderr ) | |
sys.exit(2) | |
try: | |
cpr_filename = args[0] | |
if DEBUG: print( "Opening {0} file".format(cpr_filename) ) | |
with open( cpr_filename, "rb" ) as f: | |
# verify archive ID header | |
file_id = f.read( ASCARON_ARCHIVE_ID_SIZE ) | |
if len(file_id) < ASCARON_ARCHIVE_ID_SIZE or \ | |
file_id[0:20] != ASCARON_ARCHIVE_ID: | |
raise AppException( "not an Ascaron file" ) | |
# read the info of the files stored in the .CPR file | |
file_index = parse_cpr_file( f ) | |
if DEBUG: print( "{0} files found".format(len(file_index)) ) | |
# extract the available files if commanded | |
if unpack: | |
unpack_files( f, file_index, unpack_dir, verbose, override ) | |
except AppException as err: | |
print( "ERROR: {0}".format(err.args[0]), file=sys.stderr ) | |
sys.exit(2) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment