Last active
February 16, 2025 22:41
-
-
Save book000/3866b35a0e73d32e78ab7558bf237957 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import os | |
import time | |
import zipfile | |
from tqdm import tqdm | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
class FileSystemTree: | |
def __init__(self, relative_path, absolute_path, children=None, timestamp=None): | |
self.relative_path = relative_path | |
self.absolute_path = absolute_path | |
self.children = children or [] | |
self.timestamp = timestamp | |
@classmethod | |
def new_file(cls, relative_path, absolute_path, timestamp): | |
assert relative_path and not relative_path.endswith('/') | |
return cls(relative_path, absolute_path, None, timestamp) | |
@classmethod | |
def new_dir(cls, relative_path, absolute_path, children): | |
assert not relative_path or relative_path.endswith('/') | |
return cls(relative_path, absolute_path, children) | |
def is_dir(self): | |
return not self.relative_path or self.relative_path.endswith('/') | |
def recursive(self): | |
return self._recursive([(self, 0)]) | |
def _recursive(self, stack): | |
while stack: | |
tree, idx = stack.pop() | |
if idx < len(tree.children): | |
stack.append((tree, idx + 1)) | |
new_ent = tree.children[idx] | |
stack.append((new_ent, 0)) | |
yield new_ent | |
def count_all(self): | |
return sum(1 for _ in self.recursive()) | |
def max_timestamp(self): | |
return max(entry.timestamp for entry in self.recursive() if entry.timestamp is not None) | |
class BackupConfig: | |
def __init__(self, project_base_path, backup_base_path, config_file): | |
self.project_base_path = project_base_path | |
self.backup_base_path = backup_base_path | |
self.config_file = config_file | |
if not os.path.exists(self.config_file): | |
self.init() | |
else: | |
self.fill_new_projects() | |
def init(self): | |
if os.path.exists(self.config_file): | |
print(f"Config file already exists: {self.config_file}") | |
return False | |
projects = self.__get_projects() | |
project_configs = {project: False for project in projects} | |
with open(self.config_file, "w", encoding="utf-8") as f: | |
f.write(json.dumps(project_configs, indent=2)) | |
def fill_new_projects(self): | |
with open(self.config_file, "r") as f: | |
project_configs = json.loads(f.read()) | |
projects = self.__get_projects() | |
for project in projects: | |
if project not in project_configs: | |
project_configs[project] = False | |
with open(self.config_file, "w", encoding="utf-8") as f: | |
f.write(json.dumps(project_configs, indent=2)) | |
def get_target_projects(self): | |
with open(self.config_file, "r") as f: | |
project_configs = json.loads(f.read()) | |
return [project for project, target in project_configs.items() if target] | |
def get_configs(self, backup_datetime, compression): | |
target_projects = self.get_target_projects() | |
backup_datetime_str = backup_datetime.strftime("%Y-%m-%dT%H-%M-%S") | |
results = [] | |
for project in target_projects: | |
results.append({ | |
"project_path": os.path.join(self.project_base_path, project), | |
"output_path": os.path.join(self.backup_base_path, f"{project}-{backup_datetime_str}.zip"), | |
"compression": compression | |
}) | |
def __get_projects(self): | |
results = [] | |
for entry in os.scandir(self.project_base_path): | |
if entry.is_dir(): | |
results.append(entry.name) | |
return results | |
class BackupLastTimestamp: | |
def __init__(self, last_timestamp_file): | |
self.last_timestamp_file = last_timestamp_file | |
if not os.path.exists(self.last_timestamp_file): | |
self.init() | |
def init(self): | |
with open(self.last_timestamp_file, "w") as f: | |
json.dump({}, f, indent=2) | |
def get_last_timestamp(self, project_name): | |
with open(self.last_timestamp_file, "r") as f: | |
last_timestamps = json.load(f) | |
return last_timestamps.get(project_name, None) | |
def set_last_timestamp(self, project_name, timestamp): | |
with open(self.last_timestamp_file, "r") as f: | |
last_timestamps = json.load(f) | |
last_timestamps[project_name] = timestamp | |
with open(self.last_timestamp_file, "w") as f: | |
json.dump(last_timestamps, f, indent=2) | |
def read_dir_to_tree(relative, absolute): | |
entries = [] | |
for entry in os.scandir(absolute): | |
if not entry.name.encode('utf-8', 'ignore').decode('utf-8'): | |
continue | |
if entry.is_symlink(): | |
continue | |
is_dir = entry.is_dir() | |
new_relative = f"{relative}{entry.name}/" if is_dir else f"{relative}{entry.name}" | |
if is_dir: | |
lower_name = entry.name.lower() | |
if not relative: | |
if lower_name in ["library", "logs", "obj", "temp"] or lower_name.startswith("library"): | |
continue | |
if lower_name == ".git": | |
continue | |
timestamp = entry.stat().st_mtime | |
entries.append((new_relative, entry, is_dir, timestamp)) | |
children = [ | |
read_dir_to_tree(relative, entry.path) if is_dir else FileSystemTree.new_file(relative, entry.path, timestamp) | |
for relative, entry, is_dir, timestamp in entries | |
] | |
return FileSystemTree.new_dir(relative, absolute, children) | |
def collect_notable_project_files_tree(path_buf): | |
return read_dir_to_tree("", path_buf) | |
def create_backup_zip(backup_timestamp, project_name, project_path, backup_path, compression): | |
try: | |
last_timestamp = backup_timestamp.get_last_timestamp(project_name) | |
file_tree = collect_notable_project_files_tree(project_path) | |
timestamp = file_tree.max_timestamp() | |
if last_timestamp is not None and timestamp <= last_timestamp: | |
return 0 | |
total_files = file_tree.count_all() | |
with zipfile.ZipFile(backup_path, 'w', compression) as zipf: | |
with tqdm(total=total_files, desc=f"Creating backup for {project_name}", unit="file") as pbar: | |
for entry in file_tree.recursive(): | |
if entry.is_dir(): | |
zipf.write(entry.absolute_path, entry.relative_path, compress_type=zipfile.ZIP_STORED) | |
else: | |
zipf.write(entry.absolute_path, entry.relative_path, compress_type=compression) | |
pbar.update(1) | |
pbar.close() | |
zipf.close() | |
backup_timestamp.set_last_timestamp(project_name, timestamp) | |
return 1 | |
except KeyboardInterrupt: | |
if os.path.exists(backup_path): | |
os.remove(backup_path) | |
return -1 | |
def backup_project(backup_timestamp, project_name, project_path, output_path, compression): | |
lockfile_path = os.path.join(project_path, "Temp", "UnityLockfile") | |
if os.path.exists(lockfile_path): | |
tqdm.write(f"[{project_name}] UnityLockfile exists. Waiting for it to be removed.") | |
wait_for_lockfile(lockfile_path) | |
result = create_backup_zip(backup_timestamp, project_name, project_path, output_path, compression) | |
return project_name, result | |
def wait_for_lockfile(lockfile_path): | |
while os.path.exists(lockfile_path): | |
time.sleep(1) | |
def main(): | |
project_base_path = "N:\\Game\\VRChatProjects" | |
backup_base_path = "N:\\Game\\VRChatProjectBackups" | |
config_file = "N:\\Game\\VRChatProjectBackupTool\\backup_config.json" | |
compression = zipfile.ZIP_BZIP2 | |
backup_config = BackupConfig(project_base_path, backup_base_path, config_file) | |
backup_projects = backup_config.get_target_projects() | |
backup_timestamp = BackupLastTimestamp("backup_last_timestamp.json") | |
if not backup_projects: | |
print("No projects to backup. Please set target projects in the config file.") | |
return | |
total_projects = len(backup_projects) | |
with tqdm(total=total_projects, desc="Backing up projects", unit="project") as pbar: | |
with ProcessPoolExecutor() as executor: | |
futures = [] | |
for project_name in backup_projects: | |
backup_datetime_str = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") | |
project_path = os.path.join(project_base_path, project_name) | |
output_path = os.path.join(backup_base_path, f"{project_name}-{backup_datetime_str}.zip") | |
futures.append(executor.submit(backup_project, backup_timestamp, project_name, project_path, output_path, compression)) | |
for future in as_completed(futures): | |
project_name, result = future.result() | |
if result == 1: | |
tqdm.write(f"[{project_name}] Backup completed") | |
elif result == 0: | |
tqdm.write(f"[{project_name}] No changes detected. Skipping backup") | |
else: | |
tqdm.write(f"[{project_name}] Backup cancelled") | |
pbar.update(1) | |
print("Backup process completed.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment