Last active
January 5, 2026 11:19
-
-
Save lypwig/166d6c861282d1452a4dc2edcff91038 to your computer and use it in GitHub Desktop.
FreeCad File Flattener - flatten FreeCAD FCStd files into versionable text format (.FCStdf), and back.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import zipfile | |
| import os | |
| import argparse | |
| import hashlib | |
| import base64 | |
| import sys | |
| import tempfile | |
| import fnmatch | |
| import time | |
| from datetime import datetime | |
| DEFAULT_EXCLUDES = ["*.brp", "thumbnails/*"] | |
| ORDER_FILENAME = "_index" | |
| # ---------------------------- | |
| # Utils | |
| # ---------------------------- | |
| def calculate_checksum(file_path): | |
| sha256 = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| while chunk := f.read(8192): | |
| sha256.update(chunk) | |
| return sha256.hexdigest() | |
| def encode_binary(file_path): | |
| with open(file_path, 'rb') as f: | |
| data = f.read() | |
| return base64.b64encode(data) | |
| def decode_binary(data_bytes): | |
| return base64.b64decode(data_bytes, validate=True) | |
| def is_text_file(file_path): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| f.read() | |
| return True | |
| except Exception: | |
| return False | |
| def should_exclude(path, patterns): | |
| return any(fnmatch.fnmatch(path, pat) for pat in patterns) | |
| # ---------------------------- | |
| # Stream save/restore | |
| # ---------------------------- | |
| def save_folder_to_stream(source_folder, out_stream, excludes): | |
| for root, _, files_in_dir in os.walk(source_folder): | |
| for file in files_in_dir: | |
| rel_path = os.path.relpath(os.path.join(root, file), source_folder) | |
| if should_exclude(rel_path, excludes): | |
| continue | |
| file_full_path = os.path.join(root, file) | |
| checksum = calculate_checksum(file_full_path) | |
| is_text = is_text_file(file_full_path) | |
| ftype = "Text" if is_text else "Binary" | |
| if is_text: | |
| with open(file_full_path, 'r', encoding='utf-8') as f: | |
| data_bytes = f.read().encode('utf-8') | |
| else: | |
| data_bytes = encode_binary(file_full_path) | |
| size = len(data_bytes) | |
| meta_line = f"{rel_path}|{ftype}|{size}|{checksum}\n".encode("utf-8") | |
| out_stream.write(meta_line) | |
| out_stream.write(data_bytes) | |
| out_stream.write(b"\n\n") | |
| def restore_folder_from_stream(in_stream, output_folder): | |
| restored_files = 0 | |
| total_files = 0 | |
| first_line = in_stream.readline().decode("utf-8").strip() | |
| if first_line.startswith("#FCStdChecksum="): | |
| fcstd_checksum = first_line.split("=", 1)[1] | |
| else: | |
| raise ValueError("Invalid header: missing #FCStdChecksum") | |
| order_file = os.path.join(output_folder, ORDER_FILENAME) | |
| with open(order_file, "w", encoding="utf-8") as of: | |
| pass | |
| while True: | |
| meta_line = in_stream.readline() | |
| if not meta_line: | |
| break | |
| meta_line = meta_line.strip() | |
| if not meta_line: | |
| continue | |
| try: | |
| rel_path, ftype, size_str, expected_checksum = meta_line.decode("utf-8").split("|") | |
| except ValueError: | |
| print(f"Invalid metadata line: {meta_line}", file=sys.stderr) | |
| continue | |
| size = int(size_str) | |
| total_files += 1 | |
| data_bytes = in_stream.read(size) | |
| in_stream.read(2) # skip "\n\n" | |
| output_path = os.path.join(output_folder, rel_path) | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| if ftype == "Text": | |
| with open(output_path, 'w', encoding='utf-8') as out: | |
| out.write(data_bytes.decode('utf-8')) | |
| else: | |
| decoded = decode_binary(data_bytes) | |
| with open(output_path, 'wb') as out: | |
| out.write(decoded) | |
| with open(order_file, "a", encoding="utf-8") as of: | |
| of.write(rel_path + "\n") | |
| if calculate_checksum(output_path) == expected_checksum: | |
| restored_files += 1 | |
| else: | |
| print(f"Checksum mismatch for {rel_path}", file=sys.stderr) | |
| print(f"Restored {restored_files}/{total_files} files successfully.", file=sys.stderr) | |
| return fcstd_checksum, order_file | |
| # ---------------------------- | |
| # Core ops | |
| # ---------------------------- | |
| def read_fcstdf_checksum(path): | |
| try: | |
| with open(path, "rb") as f: | |
| first_line = f.readline().decode("utf-8").strip() | |
| if first_line.startswith("#FCStdChecksum="): | |
| return first_line.split("=", 1)[1] | |
| except Exception: | |
| pass | |
| return None | |
| def flat_fcstd(input_fcstd, output_fcstdf, excludes, force=False, dry_run=False): | |
| checksum = calculate_checksum(input_fcstd) | |
| if output_fcstdf is None: | |
| output_fcstdf = os.path.splitext(input_fcstd)[0] + ".FCStdf" | |
| if os.path.exists(output_fcstdf): | |
| existing_checksum = read_fcstdf_checksum(output_fcstdf) | |
| if existing_checksum == checksum: | |
| print(f"[skip] {output_fcstdf} already up-to-date") | |
| return | |
| if not force: | |
| resp = input(f"{output_fcstdf} already exists. Overwrite? [y/N] ") | |
| if resp.strip().lower() != "y": | |
| print("[skip] Not overwritten") | |
| return | |
| if dry_run: | |
| print(f"[dry-run] Would flatten {input_fcstd} -> {output_fcstdf}") | |
| return | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| with zipfile.ZipFile(input_fcstd, 'r') as zip_ref: | |
| zip_ref.extractall(tmpdir) | |
| with open(output_fcstdf, "wb") as f: | |
| f.write(f"#FCStdChecksum={checksum}\n".encode("utf-8")) | |
| save_folder_to_stream(tmpdir, f, excludes) | |
| print(f"Flattened {input_fcstd} -> {output_fcstdf}") | |
| def zip_fcstdf(input_fcstdf, output_fcstd, store=False, force=False, dry_run=False): | |
| expected_checksum = read_fcstdf_checksum(input_fcstdf) | |
| if expected_checksum is None: | |
| raise ValueError("Invalid .FCStdf: missing checksum header") | |
| if output_fcstd is None: | |
| output_fcstd = os.path.splitext(input_fcstdf)[0] + ".FCStd" | |
| if os.path.exists(output_fcstd): | |
| existing_checksum = calculate_checksum(output_fcstd) | |
| if existing_checksum == expected_checksum: | |
| print(f"[skip] {output_fcstd} already up-to-date") | |
| return | |
| if not force: | |
| src_mtime = datetime.fromtimestamp(os.path.getmtime(input_fcstdf)) | |
| dst_mtime = datetime.fromtimestamp(os.path.getmtime(output_fcstd)) | |
| resp = input(f"Conflict: {output_fcstd} exists (modified {dst_mtime}), " | |
| f"new version modified {src_mtime}. Overwrite? [y/N] ") | |
| if resp.strip().lower() != "y": | |
| print("[skip] Not overwritten") | |
| return | |
| if dry_run: | |
| print(f"[dry-run] Would zip {input_fcstdf} -> {output_fcstd}") | |
| return | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| with open(input_fcstdf, "rb") as f: | |
| fcstd_checksum, order_file = restore_folder_from_stream(f, tmpdir) | |
| compression = zipfile.ZIP_STORED if store else zipfile.ZIP_DEFLATED | |
| with open(order_file, "r", encoding="utf-8") as f: | |
| file_order = [line.strip() for line in f if line.strip()] | |
| with zipfile.ZipFile(output_fcstd, 'w', compression=compression) as zip_ref: | |
| for file_name in file_order: | |
| file_path = os.path.join(tmpdir, file_name) | |
| if os.path.isfile(file_path): | |
| zip_ref.write(file_path, arcname=file_name) | |
| print(f"Zipped {input_fcstdf} -> {output_fcstd}") | |
| # ---------------------------- | |
| # Git Hooks | |
| # ---------------------------- | |
| def install_hooks(force=False, dry_run=False): | |
| hooks = { | |
| "pre-commit": "flat -r", | |
| "post-merge": "zip -r", | |
| } | |
| script_path = os.path.abspath(sys.argv[0]) | |
| hooks_dir = os.path.join(".git", "hooks") | |
| os.makedirs(hooks_dir, exist_ok=True) | |
| for hook_name, cmd in hooks.items(): | |
| hook_file = os.path.join(hooks_dir, hook_name) | |
| if os.path.exists(hook_file) and not force: | |
| resp = input(f"{hook_file} already exists. Overwrite? [y/N] ") | |
| if resp.strip().lower() != "y": | |
| print(f"[skip] Did not overwrite {hook_file}") | |
| continue | |
| if dry_run: | |
| print(f"[dry-run] Would install {hook_name} hook at {hook_file}") | |
| continue | |
| with open(hook_file, "w", encoding="utf-8") as f: | |
| f.write(f"#!/bin/sh\nexec \"{script_path}\" {cmd} \"$@\"\n") | |
| os.chmod(hook_file, 0o755) | |
| print(f"[install-hooks] Installed {hook_name} hook -> {hook_file}") | |
| # ---------------------------- | |
| # Watcher | |
| # ---------------------------- | |
| def watch_directory(path, interval, excludes, force=False, dry_run=False): | |
| seen = {} | |
| print(f"Watching {path} every {interval}s...") | |
| while True: | |
| for root, _, files in os.walk(path): | |
| for f in files: | |
| if not f.endswith(".FCStd"): | |
| continue | |
| inp = os.path.join(root, f) | |
| outp = os.path.splitext(inp)[0] + ".FCStdf" | |
| mtime = os.path.getmtime(inp) | |
| if inp not in seen or seen[inp] < mtime: | |
| flat_fcstd(inp, outp, excludes, force=force, dry_run=dry_run) | |
| seen[inp] = mtime | |
| time.sleep(interval) | |
| # ---------------------------- | |
| # Main CLI | |
| # ---------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Flatten/zip FreeCAD FCStd files into versionable text format (.FCStdf)." | |
| ) | |
| subparsers = parser.add_subparsers(dest="command") | |
| flat_parser = subparsers.add_parser("flat", help="Convert .FCStd into .FCStdf") | |
| flat_parser.add_argument("input_fcstd", nargs="?", help="Input .FCStd file") | |
| flat_parser.add_argument("output_fcstdf", nargs="?", help="Output .FCStdf file") | |
| flat_parser.add_argument("-r", "--recursive", nargs="?", const=".", help="Process all .FCStd files in directory") | |
| flat_parser.add_argument("-e", "--exclude", default=",".join(DEFAULT_EXCLUDES), | |
| help="Exclude patterns (comma-separated)") | |
| flat_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite without asking") | |
| flat_parser.add_argument("--dry-run", action="store_true", help="Show actions without writing") | |
| zip_parser = subparsers.add_parser("zip", help="Convert .FCStdf back into .FCStd") | |
| zip_parser.add_argument("input_fcstdf", nargs="?", help="Input .FCStdf file") | |
| zip_parser.add_argument("output_fcstd", nargs="?", help="Output .FCStd file") | |
| zip_parser.add_argument("-r", "--recursive", nargs="?", const=".", help="Process all .FCStdf files in directory") | |
| zip_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite without asking") | |
| zip_parser.add_argument("--dry-run", action="store_true", help="Show actions without writing") | |
| zip_parser.add_argument("-s", "--store", action="store_true", help="Use store mode instead of deflate") | |
| install_parser = subparsers.add_parser("install-hooks", help="Install Git hooks in .git/hooks/") | |
| install_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite without asking") | |
| install_parser.add_argument("--dry-run", action="store_true", help="Show actions without writing") | |
| watch_parser = subparsers.add_parser("watch", help="Watch directory and auto-flat .FCStd files") | |
| watch_parser.add_argument("path", nargs="?", default=".", help="Directory to watch") | |
| watch_parser.add_argument("-i", "--interval", type=float, default=1.0, help="Polling interval in seconds") | |
| watch_parser.add_argument("-e", "--exclude", default=",".join(DEFAULT_EXCLUDES), | |
| help="Exclude patterns (comma-separated)") | |
| watch_parser.add_argument("-f", "--force", action="store_true", help="Force overwrite without asking") | |
| watch_parser.add_argument("--dry-run", action="store_true", help="Show actions without writing") | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| sys.exit(1) | |
| excludes = [pat.strip() for pat in getattr(args, "exclude", "").split(",") if pat.strip()] | |
| if args.command == "flat": | |
| if args.recursive: | |
| for root, _, files in os.walk(args.recursive): | |
| for f in files: | |
| if f.endswith(".FCStd"): | |
| inp = os.path.join(root, f) | |
| outp = os.path.splitext(inp)[0] + ".FCStdf" | |
| flat_fcstd(inp, outp, excludes, force=args.force, dry_run=args.dry_run) | |
| else: | |
| if not args.input_fcstd: | |
| flat_parser.print_help() | |
| sys.exit(1) | |
| flat_fcstd(args.input_fcstd, args.output_fcstdf, excludes, force=args.force, dry_run=args.dry_run) | |
| elif args.command == "zip": | |
| if args.recursive: | |
| for root, _, files in os.walk(args.recursive): | |
| for f in files: | |
| if f.endswith(".FCStdf"): | |
| inp = os.path.join(root, f) | |
| outp = os.path.splitext(inp)[0] + ".FCStd" | |
| zip_fcstdf(inp, outp, store=args.store, force=args.force, dry_run=args.dry_run) | |
| else: | |
| if not args.input_fcstdf: | |
| zip_parser.print_help() | |
| sys.exit(1) | |
| zip_fcstdf(args.input_fcstdf, args.output_fcstd, store=args.store, force=args.force, dry_run=args.dry_run) | |
| elif args.command == "install-hooks": | |
| install_hooks(force=args.force, dry_run=args.dry_run) | |
| elif args.command == "watch": | |
| watch_directory(args.path, args.interval, excludes, force=args.force, dry_run=args.dry_run) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import zipfile | |
| import tempfile | |
| import hashlib | |
| import subprocess | |
| import sys | |
| import time | |
| import shutil | |
| import unittest | |
| SCRIPT = os.path.abspath("fcstd_flatzip.py") # Adjust if script is named differently | |
| def sha256sum(path: str) -> str: | |
| """Return SHA-256 checksum of a file.""" | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| while chunk := f.read(8192): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def make_fake_fcstd(path: str): | |
| """Create a dummy .FCStd file (zip with XML and binary content).""" | |
| with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as z: | |
| z.writestr("Document.xml", "<FreeCADFile>demo</FreeCADFile>") | |
| z.writestr("GuiDocument.xml", "<GuiDocument>demo</GuiDocument>") | |
| z.writestr("thumbnails/Thumbnail.png", b"\x89PNG\r\n\x1a\n\x00\x00") | |
| def run_cmd(args, cwd=None, input_data=None, timeout=10): | |
| """Run CLI command and return completed process object.""" | |
| return subprocess.run( | |
| [sys.executable, SCRIPT] + args, | |
| cwd=cwd, | |
| input=input_data, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=timeout, | |
| ) | |
| class TestFCStdFlatZip(unittest.TestCase): | |
| def setUp(self): | |
| """Create a temporary working directory for each test.""" | |
| self.tmpdir = tempfile.mkdtemp() | |
| self.fcstd = os.path.join(self.tmpdir, "test.FCStd") | |
| make_fake_fcstd(self.fcstd) | |
| def tearDown(self): | |
| """Cleanup after tests.""" | |
| shutil.rmtree(self.tmpdir) | |
| def test_flat_and_zip_roundtrip(self): | |
| orig_hash = sha256sum(self.fcstd) | |
| r1 = run_cmd(["flat", self.fcstd]) | |
| self.assertEqual(r1.returncode, 0) | |
| fcstdf = self.fcstd.replace(".FCStd", ".FCStdf") | |
| self.assertTrue(os.path.exists(fcstdf)) | |
| r2 = run_cmd(["zip", fcstdf]) | |
| self.assertEqual(r2.returncode, 0) | |
| rebuilt = self.fcstd # same path after zip | |
| self.assertTrue(os.path.exists(rebuilt)) | |
| self.assertEqual(sha256sum(rebuilt), orig_hash) | |
| def test_force_overwrite(self): | |
| fcstdf = self.fcstd.replace(".FCStd", ".FCStdf") | |
| run_cmd(["flat", self.fcstd]) | |
| self.assertTrue(os.path.exists(fcstdf)) | |
| r = run_cmd(["flat", self.fcstd, "--force"]) | |
| self.assertEqual(r.returncode, 0) | |
| def test_dry_run(self): | |
| fcstdf = self.fcstd.replace(".FCStd", ".FCStdf") | |
| r = run_cmd(["flat", self.fcstd, "--dry-run"]) | |
| self.assertEqual(r.returncode, 0) | |
| self.assertFalse(os.path.exists(fcstdf)) | |
| def test_store_compression(self): | |
| run_cmd(["flat", self.fcstd]) | |
| fcstdf = self.fcstd.replace(".FCStd", ".FCStdf") | |
| run_cmd(["zip", fcstdf, "--store"]) | |
| rebuilt = self.fcstd | |
| self.assertTrue(zipfile.is_zipfile(rebuilt)) | |
| def test_exclude(self): | |
| run_cmd(["flat", self.fcstd, "--exclude", "thumbnails/*"]) | |
| fcstdf = self.fcstd.replace(".FCStd", ".FCStdf") | |
| run_cmd(["zip", fcstdf]) | |
| rebuilt = self.fcstd | |
| with zipfile.ZipFile(rebuilt, "r") as z: | |
| names = z.namelist() | |
| self.assertFalse(any("thumbnails/" in n for n in names)) | |
| def test_watch(self): | |
| watch_fcstd = os.path.join(self.tmpdir, "watch.FCStd") | |
| watch_fcstdf = watch_fcstd.replace(".FCStd", ".FCStdf") | |
| proc = subprocess.Popen( | |
| [sys.executable, SCRIPT, "watch", self.tmpdir, "--interval", "1"], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| time.sleep(1.5) | |
| make_fake_fcstd(watch_fcstd) | |
| time.sleep(2.5) | |
| proc.terminate() | |
| try: | |
| proc.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| self.assertTrue(os.path.exists(watch_fcstdf)) | |
| def test_install_hooks(self): | |
| git_dir = os.path.join(self.tmpdir, ".git") | |
| hooks_dir = os.path.join(git_dir, "hooks") | |
| os.makedirs(hooks_dir) | |
| r = run_cmd(["install-hooks"], cwd=self.tmpdir) | |
| self.assertEqual(r.returncode, 0) | |
| pre = os.path.join(hooks_dir, "pre-commit") | |
| post = os.path.join(hooks_dir, "post-merge") | |
| self.assertTrue(os.path.exists(pre)) | |
| self.assertTrue(os.path.exists(post)) | |
| with open(pre) as f: | |
| self.assertIn("fcff flat -r", f.read()) | |
| with open(post) as f: | |
| self.assertIn("fcff zip -r", f.read()) | |
| if __name__ == "__main__": | |
| unittest.main(verbosity=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment