Last active
July 18, 2020 21:31
-
-
Save icedraco/2c7622618a6874f00559aa79352b35ee to your computer and use it in GitHub Desktop.
File Split & Merge Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
IceDragon's Split & Merge (IDSM) Script | |
""" | |
from __future__ import annotations | |
import os | |
import sys | |
import struct | |
import hashlib | |
from typing import Iterator, BinaryIO, TextIO | |
KB = 1024 | |
MB = 1024 * KB | |
GB = 1024 * MB | |
BUFFER_SIZE = 8 * KB | |
DEFAULT_CHUNK_SIZE = 20 * MB | |
HASH_SIZE = hashlib.md5().digest_size # bytes | |
ZERO_HASH = b'\x00' * HASH_SIZE | |
# --- Classes --------------------------------------------------------------- # | |
class ChunkHeader: | |
""" | |
Represents a header for every chunk file | |
""" | |
FMT_INDEX = '<H' | |
MAGIC_NUMBER = b'ID' | |
FORMAT_VERSION = b'01' | |
INDEX_SIZE = struct.calcsize(FMT_INDEX) # unsigned short | |
HEADER_SIZE = len(MAGIC_NUMBER) + len(FORMAT_VERSION) + HASH_SIZE + INDEX_SIZE | |
@classmethod | |
def read_from(cls, fd: BinaryIO): | |
magic = fd.read(2) | |
if magic != cls.MAGIC_NUMBER: | |
raise Exception(f'invalid magic number: {repr(magic)}') | |
version = fd.read(2) | |
if version != cls.FORMAT_VERSION: | |
raise Exception(f'unsupported format version: {repr(version)}') | |
index: int = struct.unpack(cls.FMT_INDEX, fd.read(cls.INDEX_SIZE))[0] | |
chunk_hash = fd.read(HASH_SIZE) | |
if len(chunk_hash) != HASH_SIZE: | |
raise Exception(f'invalid hash size ({len(chunk_hash)} != {HASH_SIZE})') | |
return cls(index, chunk_hash) | |
def __init__(self, index: int, chunk_hash: bytes = ZERO_HASH): | |
assert 0 <= index <= 0xFFFF, repr(index) | |
self.chunk_index = index | |
self.chunk_hash = chunk_hash | |
def __bytes__(self) -> bytes: | |
return self.serialize() | |
def write(self, fd: BinaryIO): | |
fd.write(self.serialize()) | |
def serialize(self) -> bytes: | |
index_bytes = struct.pack(self.FMT_INDEX, self.chunk_index) | |
header = self.MAGIC_NUMBER + self.FORMAT_VERSION + index_bytes + self.chunk_hash | |
assert len(header) == self.HEADER_SIZE | |
return header | |
class Manifest: | |
REQUIRED_KEYS = {'src_filename', 'file_size', 'file_hash', 'num_chunks'} | |
@classmethod | |
def load(cls, filename: str) -> Manifest: | |
items = {} | |
cum_hashes = [] | |
with open(filename, 'r') as f: | |
flag_read_hashes = False | |
for line in (line.strip() for line in f): | |
if not line or line.startswith('#'): | |
continue | |
elif line.startswith('->'): | |
flag_read_hashes = True | |
continue | |
elif line.startswith('<-'): | |
flag_read_hashes = False | |
continue | |
if flag_read_hashes: | |
cum_hashes.append(line) | |
else: | |
assert '=' in line, repr(line) | |
key, value = line.split('=', 1) | |
items[key] = value | |
missing_keys = cls.REQUIRED_KEYS - items.keys() | |
if missing_keys: | |
keys_str = ', '.join(missing_keys) | |
raise AssertionError(f'missing manifest keys: {keys_str}') | |
manifest = Manifest( | |
items['src_filename'], | |
int(items['file_size']), | |
int(items['num_chunks']), | |
items['file_hash']) | |
for c_hash in cum_hashes: | |
manifest.add_cumulative_hash(c_hash) | |
return manifest | |
def __init__(self, filename: str, file_size: int, num_chunks: int = None, file_hash: str = None): | |
self.filename = filename | |
self.file_size = file_size | |
self.file_hash = file_hash | |
self.num_chunks = num_chunks | |
self.cumulative_hashes = [] | |
def __len__(self) -> int: | |
return self.num_chunks | |
def has_all_cumulative_hashes(self) -> bool: | |
return len(self.cumulative_hashes) >= self.num_chunks | |
def add_cumulative_hash(self, h: str): | |
if len(h) != HASH_SIZE * 2: | |
ValueError(f"invalid hash: {h} (hash index: {len(self.cumulative_hashes)})") | |
self.cumulative_hashes.append(h) | |
def write(self, f: TextIO): | |
buffer = [ | |
'# IDSM Splitter Manifest', | |
f'src_filename={self.filename}', | |
f'file_size={self.file_size}', | |
f'file_hash={self.file_hash}', | |
f'num_chunks={self.num_chunks}', | |
'', | |
'-> cumulative hashes', | |
*self.cumulative_hashes, | |
'<- cumulative hashes', | |
] | |
f.write('\n'.join(buffer + [''])) | |
# --- MAIN ------------------------------------------------------------------ # | |
def main() -> int: | |
argv = sys.argv | |
print('--- IDSM 1.0 ------------------------------------') | |
if len(argv) < 2: | |
return print_usage() | |
mode = argv[1] | |
if mode.startswith('s'): # split | |
if len(argv) >= 4: | |
src_file = os.path.abspath(argv[2]) | |
_, src_filename = os.path.split(src_file) | |
target_dir = os.path.abspath(argv[3]) | |
manifest_file = os.path.join(target_dir, src_filename + '.manifest') | |
return main_split(src_file, target_dir, manifest_file) | |
elif mode.startswith('m'): # merge | |
if len(argv) >= 3: | |
manifest_file = os.path.abspath(argv[2]) | |
# determine target_dir | |
if len(argv) >= 4: | |
target_dir = os.path.abspath(argv[3]) | |
else: | |
_, target_dir = os.path.split(manifest_file) | |
return main_merge(manifest_file=argv[2], target_dir=target_dir) | |
elif mode.startswith('v'): # verify | |
if len(argv) >= 3: | |
manifest_file = os.path.abspath(argv[2]) | |
_, target_dir = os.path.split(manifest_file) | |
return main_verify(manifest_file=argv[2]) | |
# unknown mode | |
return print_usage() | |
def main_split(src_file, target_dir, manifest_file, chunk_size=DEFAULT_CHUNK_SIZE) -> int: | |
print(f'source: {src_file}') | |
print(f'target dir: {target_dir}') | |
print(f'manifest: {manifest_file}') | |
print(f'chunk size: {chunk_size} bytes') | |
print() | |
if not os.path.isfile(src_file): | |
sys.stderr.write('Error: source file does not exist or is not a file\n') | |
return 2 | |
try: | |
os.mkdir(target_dir) | |
except FileExistsError: | |
pass | |
except OSError as ex: | |
sys.stderr.write(f'Cannot create target dir: {ex.strerror}\n') | |
return 3 | |
try: | |
return split(src_file, target_dir, manifest_file, chunk_size=chunk_size) | |
except ValueError as ex: | |
sys.stderr.write(f'Error: {ex}\n') | |
return 4 | |
def main_merge(manifest_file: str, target_dir: str = None) -> int: | |
if not target_dir: | |
target_dir, _ = os.path.split(os.path.abspath(manifest_file)) | |
print(f'manifest: {manifest_file}') | |
print(f'target dir: {target_dir}') | |
print() | |
if not os.path.isfile(manifest_file): | |
sys.stderr.write('Error: manifest file not found\n') | |
return 1 | |
if not os.path.isdir(target_dir): | |
sys.stderr.write('Error: target directory is missing\n') | |
return 1 | |
return merge(manifest_file, target_dir) | |
def main_verify(manifest_file: str, target_dir: str = None) -> int: | |
if not target_dir: | |
target_dir, _ = os.path.split(os.path.abspath(manifest_file)) | |
print(f'manifest: {manifest_file}') | |
print(f'target dir: {target_dir}') | |
print() | |
if not os.path.isfile(manifest_file): | |
sys.stderr.write('Error: manifest file not found\n') | |
return 1 | |
if not os.path.isdir(target_dir): | |
sys.stderr.write('Error: target directory is missing\n') | |
return 1 | |
return verify(manifest_file) | |
# --- FUNCTIONS ------------------------------------------------------------- # | |
def print_usage(): | |
sys.stderr.write(f'Syntax: {sys.argv[0]} split <src-file> <target-dir>\n') | |
sys.stderr.write(f' {sys.argv[0]} merge <manifest-file> [target-dir]\n') | |
sys.stderr.write(f' {sys.argv[0]} verify <manifest-file>\n') | |
sys.stderr.write('\n') | |
return 1 | |
def split(src_file: str, target_dir: str, manifest_file: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE): | |
src_filename = os.path.basename(src_file) | |
manifest_filename = os.path.basename(manifest_file) | |
print(f'--- BEGIN SPLIT: {src_filename} -> {target_dir} [manifest: {manifest_filename}]') | |
print(' * checking request sanity') | |
if not os.path.isfile(src_file): | |
raise ValueError(f'not a file: {src_file}') | |
effective_chunk_size = chunk_size - ChunkHeader.HEADER_SIZE | |
if effective_chunk_size <= 0: | |
raise ValueError(f'chunk size ({chunk_size}) is too low (not enough space for header)') | |
print(' * preparing...') | |
file_size = os.path.getsize(src_file) | |
print(f' > file size: {file_size} bytes') | |
num_chunks = calculate_num_chunks(file_size, effective_chunk_size) | |
num_chunk_digits = len(str(num_chunks)) | |
print(f' > # chunks: {num_chunks}') | |
def chunk_path(n: int) -> str: | |
x_value = str(n).zfill(num_chunk_digits) | |
return os.path.join(target_dir, f'{src_filename}.x{x_value}') | |
def bucket_sizes(total_size: int, bucket_size: int) -> Iterator[int]: | |
num_full, remaining_size = divmod(total_size, bucket_size) | |
yield from (bucket_size for _ in range(num_full)) | |
yield remaining_size | |
manifest = Manifest(src_filename, file_size, num_chunks) | |
print(' * opening source file...') | |
with open(src_file, 'rb') as f_input: | |
h_manifest = hashlib.md5() | |
print(' = writing chunks:') | |
for i, current_effective_size in enumerate(bucket_sizes(file_size, effective_chunk_size)): | |
print(f' > chunk {i + 1} / {num_chunks}') | |
h_chunk = hashlib.md5() | |
with open(chunk_path(i + 1), 'wb') as f_output: | |
header = ChunkHeader(i) | |
header.write(f_output) | |
h_chunk.update(bytes(header)) # include header in chunk hash | |
pos = 0 | |
while pos < current_effective_size: | |
buffer = f_input.read(min(BUFFER_SIZE, current_effective_size - pos)) | |
if len(buffer) == 0: | |
raise RuntimeError(f'buffer is empty (pos={pos}, eff_sz={current_effective_size})') | |
f_output.write(buffer) | |
h_chunk.update(buffer) | |
h_manifest.update(buffer) | |
pos += len(buffer) | |
# verify chunk written in its entirety | |
if f_output.tell() != current_effective_size + ChunkHeader.HEADER_SIZE: | |
p = f_output.tell() | |
raise RuntimeError(f'unexpected output file position: {p} != {current_effective_size}') | |
# write chunk header | |
header.chunk_hash = h_chunk.digest() | |
f_output.seek(0) | |
header.write(f_output) | |
# done writing chunk | |
manifest.add_cumulative_hash(h_manifest.hexdigest()) | |
print(' ! all chunks written') | |
print() | |
print(' * checking input file position') | |
if f_input.tell() != file_size: | |
p = f_input.tell() | |
raise RuntimeError(f'unexpected input file position: {p} != {file_size}') | |
print(' * writing manifest...') | |
manifest.file_hash = h_manifest.hexdigest() | |
with open(manifest_file, 'w') as f_manifest: | |
manifest.write(f_manifest) | |
print(' ! manifest written') | |
print() | |
print('ALL DONE') | |
def verify(manifest_file: str) -> int: | |
target_dir, manifest_filename = os.path.split(os.path.abspath(manifest_file)) | |
print(f'--- VERIFY: {manifest_file}') | |
print(f' * target dir: {target_dir}') | |
if not os.path.isfile(manifest_file): | |
raise ValueError(f'not a file: {manifest_file}') | |
manifest = Manifest.load(manifest_file) | |
num_chunks = manifest.num_chunks | |
num_cumulative_hashes = len(manifest.cumulative_hashes) | |
chunk_count_len = len(str(num_chunks)) | |
def get_filename(chunk_num: int) -> str: | |
chunk_str = str(chunk_num).zfill(chunk_count_len) | |
return os.path.join(target_dir, f'{manifest.filename}.x{chunk_str}') | |
print(' ~ validating chunks:') | |
h_full = hashlib.md5() | |
for i in range(manifest.num_chunks): | |
issues = [] | |
cum_hash = 'n/a' | |
chunk_file = get_filename(i + 1) | |
chunk_filename = os.path.basename(chunk_file) | |
chunk_file_size = os.path.getsize(chunk_file) | |
chunk_effective_size = chunk_file_size - ChunkHeader.HEADER_SIZE | |
try: | |
if chunk_effective_size <= 0: | |
issues.append(f'FILE TOO SMALL: sz={chunk_file_size} eff_sz={chunk_effective_size}') | |
continue | |
with open(chunk_file, 'rb') as f: | |
h_chunk = hashlib.md5() | |
h_chunk_expected_bytes = None | |
try: | |
header = ChunkHeader.read_from(f) | |
except Exception as ex: | |
issues.append(f'HEADER FAIL: {ex}') | |
else: | |
if header.chunk_index != i: | |
issues.append(f'CHUNK INDEX MISMATCH: idx={header.chunk_index} i={i}') | |
h_chunk_expected_bytes = header.chunk_hash | |
header.chunk_hash = ZERO_HASH | |
h_chunk.update(header.serialize()) | |
# read data itself | |
f.seek(ChunkHeader.HEADER_SIZE) | |
pos = f.tell() | |
while pos < chunk_file_size: | |
buffer = f.read(min(BUFFER_SIZE, chunk_file_size - pos)) | |
if len(buffer) == 0: | |
issues.append(f'EMPTY BUFFER: pos={pos} / {chunk_file_size}') | |
h_full.update(buffer) | |
h_chunk.update(buffer) | |
pos += len(buffer) | |
# check integrity hash for this chunk | |
if h_chunk_expected_bytes != h_chunk.digest(): | |
exp = repr(h_chunk_expected_bytes) | |
act = repr(h_chunk.digest()) | |
issues.append(f'FILE INTEGRITY FAILED: expected={exp} actual={act}') | |
# check cumulative hash for this chunk | |
cum_hash = manifest.cumulative_hashes[i] if i < num_cumulative_hashes else None | |
if cum_hash: | |
if cum_hash != h_full.hexdigest(): | |
issues.append(f'C-HASH MISMATCH: expected={cum_hash} actual={h_full.hexdigest()}') | |
else: | |
issues.append(f'NO CUMULATIVE HASH ({num_cumulative_hashes} c-hashes)') | |
finally: | |
status = 'FAIL' if issues else ' OK ' | |
print(f' > [{status}] chunk {i + 1} / {num_chunks} ({chunk_filename}) [{cum_hash}]') | |
for issue in issues: | |
print(f' * {issue}') | |
print(' ~ checking file md5:') | |
print(f' > expected: {manifest.file_hash}') | |
print(f' > final: {h_full.hexdigest()}') | |
verdict = 'PASS' if manifest.file_hash == h_full.hexdigest() else 'FAIL' | |
print(f' > verdict: {verdict}') | |
return int(verdict == 'PASS') | |
def merge(manifest_file: str, target_dir: str = None) -> int: | |
source_dir, manifest_filename = os.path.split(os.path.abspath(manifest_file)) | |
if not target_dir: | |
target_dir = source_dir | |
print(f'--- MERGE: {manifest_file}') | |
print(f' * source dir: {source_dir}') | |
print(f' * target dir: {target_dir}') | |
if not os.path.isfile(manifest_file): | |
raise ValueError(f'not a file: {manifest_file}') | |
manifest = Manifest.load(manifest_file) | |
target_file = os.path.join(target_dir, manifest.filename) | |
print(f' * target file: {target_file}') | |
print() | |
num_chunks = manifest.num_chunks | |
num_cumulative_hashes = len(manifest.cumulative_hashes) | |
chunk_count_len = len(str(num_chunks)) | |
def get_filename(chunk_num: int) -> str: | |
chunk_str = str(chunk_num).zfill(chunk_count_len) | |
return os.path.join(source_dir, f'{manifest.filename}.x{chunk_str}') | |
print(' ~ merging chunks:') | |
h_full = hashlib.md5() | |
with open(target_file, 'wb') as f_master: | |
for i in range(manifest.num_chunks): | |
issues = [] | |
cum_hash = 'n/a' | |
chunk_file = get_filename(i + 1) | |
chunk_filename = os.path.basename(chunk_file) | |
chunk_file_size = os.path.getsize(chunk_file) | |
chunk_effective_size = chunk_file_size - ChunkHeader.HEADER_SIZE | |
try: | |
if chunk_effective_size <= 0: | |
issues.append(f'FILE TOO SMALL: sz={chunk_file_size} eff_sz={chunk_effective_size}') | |
continue | |
with open(chunk_file, 'rb') as f: | |
h_chunk = hashlib.md5() | |
try: | |
header = ChunkHeader.read_from(f) | |
except Exception as ex: | |
issues.append(f'HEADER FAIL: {ex}') | |
continue | |
else: | |
if header.chunk_index != i: | |
issues.append(f'CHUNK INDEX MISMATCH: idx={header.chunk_index} i={i}') | |
continue | |
h_chunk_expected_bytes = header.chunk_hash | |
header.chunk_hash = ZERO_HASH | |
h_chunk.update(header.serialize()) | |
# transfer data | |
f.seek(ChunkHeader.HEADER_SIZE) | |
pos = f.tell() | |
while pos < chunk_file_size: | |
buffer = f.read(min(BUFFER_SIZE, chunk_file_size - pos)) | |
if len(buffer) == 0: | |
issues.append(f'EMPTY BUFFER: pos={pos} / {chunk_file_size}') | |
try: | |
f_master.write(buffer) | |
except IOError as ex: | |
issues.append(f'I/O ERROR: {ex.strerror}') | |
raise | |
h_full.update(buffer) | |
h_chunk.update(buffer) | |
pos += len(buffer) | |
# check integrity hash for this chunk | |
if h_chunk_expected_bytes != h_chunk.digest(): | |
exp = repr(h_chunk_expected_bytes) | |
act = repr(h_chunk.digest()) | |
issues.append(f'FILE INTEGRITY FAILED: expected={exp} actual={act}') | |
# check cumulative hash for this chunk | |
cum_hash = manifest.cumulative_hashes[i] if i < num_cumulative_hashes else None | |
if cum_hash: | |
if cum_hash != h_full.hexdigest(): | |
issues.append(f'C-HASH MISMATCH: expected={cum_hash} actual={h_full.hexdigest()}') | |
else: | |
issues.append(f'NO CUMULATIVE HASH ({num_cumulative_hashes} c-hashes)') | |
finally: | |
status = 'FAIL' if issues else ' OK ' | |
print(f' > [{status}] chunk {i + 1} / {num_chunks} ({chunk_filename}) [{cum_hash}]') | |
for issue in issues: | |
print(f' * {issue}') | |
if issues: | |
return 2 | |
print(' ~ checking file md5:') | |
print(f' > expected: {manifest.file_hash}') | |
print(f' > final: {h_full.hexdigest()}') | |
verdict = 'PASS' if manifest.file_hash == h_full.hexdigest() else 'FAIL' | |
print(f' > verdict: {verdict}') | |
return int(verdict == 'PASS') | |
def calculate_num_chunks(file_size: int, chunk_size: int) -> int: | |
full_chunks, remain_bytes = divmod(file_size, chunk_size - ChunkHeader.HEADER_SIZE) | |
return full_chunks + int(remain_bytes > 0) | |
if __name__ == '__main__': | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment