Skip to content

Instantly share code, notes, and snippets.

@book000
Last active February 16, 2025 22:41
Show Gist options
  • Save book000/3866b35a0e73d32e78ab7558bf237957 to your computer and use it in GitHub Desktop.
Save book000/3866b35a0e73d32e78ab7558bf237957 to your computer and use it in GitHub Desktop.
import datetime
import json
import os
import time
import zipfile
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
class FileSystemTree:
def __init__(self, relative_path, absolute_path, children=None, timestamp=None):
self.relative_path = relative_path
self.absolute_path = absolute_path
self.children = children or []
self.timestamp = timestamp
@classmethod
def new_file(cls, relative_path, absolute_path, timestamp):
assert relative_path and not relative_path.endswith('/')
return cls(relative_path, absolute_path, None, timestamp)
@classmethod
def new_dir(cls, relative_path, absolute_path, children):
assert not relative_path or relative_path.endswith('/')
return cls(relative_path, absolute_path, children)
def is_dir(self):
return not self.relative_path or self.relative_path.endswith('/')
def recursive(self):
return self._recursive([(self, 0)])
def _recursive(self, stack):
while stack:
tree, idx = stack.pop()
if idx < len(tree.children):
stack.append((tree, idx + 1))
new_ent = tree.children[idx]
stack.append((new_ent, 0))
yield new_ent
def count_all(self):
return sum(1 for _ in self.recursive())
def max_timestamp(self):
return max(entry.timestamp for entry in self.recursive() if entry.timestamp is not None)
class BackupConfig:
def __init__(self, project_base_path, backup_base_path, config_file):
self.project_base_path = project_base_path
self.backup_base_path = backup_base_path
self.config_file = config_file
if not os.path.exists(self.config_file):
self.init()
else:
self.fill_new_projects()
def init(self):
if os.path.exists(self.config_file):
print(f"Config file already exists: {self.config_file}")
return False
projects = self.__get_projects()
project_configs = {project: False for project in projects}
with open(self.config_file, "w", encoding="utf-8") as f:
f.write(json.dumps(project_configs, indent=2))
def fill_new_projects(self):
with open(self.config_file, "r") as f:
project_configs = json.loads(f.read())
projects = self.__get_projects()
for project in projects:
if project not in project_configs:
project_configs[project] = False
with open(self.config_file, "w", encoding="utf-8") as f:
f.write(json.dumps(project_configs, indent=2))
def get_target_projects(self):
with open(self.config_file, "r") as f:
project_configs = json.loads(f.read())
return [project for project, target in project_configs.items() if target]
def get_configs(self, backup_datetime, compression):
target_projects = self.get_target_projects()
backup_datetime_str = backup_datetime.strftime("%Y-%m-%dT%H-%M-%S")
results = []
for project in target_projects:
results.append({
"project_path": os.path.join(self.project_base_path, project),
"output_path": os.path.join(self.backup_base_path, f"{project}-{backup_datetime_str}.zip"),
"compression": compression
})
def __get_projects(self):
results = []
for entry in os.scandir(self.project_base_path):
if entry.is_dir():
results.append(entry.name)
return results
class BackupLastTimestamp:
def __init__(self, last_timestamp_file):
self.last_timestamp_file = last_timestamp_file
if not os.path.exists(self.last_timestamp_file):
self.init()
def init(self):
with open(self.last_timestamp_file, "w") as f:
json.dump({}, f, indent=2)
def get_last_timestamp(self, project_name):
with open(self.last_timestamp_file, "r") as f:
last_timestamps = json.load(f)
return last_timestamps.get(project_name, None)
def set_last_timestamp(self, project_name, timestamp):
with open(self.last_timestamp_file, "r") as f:
last_timestamps = json.load(f)
last_timestamps[project_name] = timestamp
with open(self.last_timestamp_file, "w") as f:
json.dump(last_timestamps, f, indent=2)
def read_dir_to_tree(relative, absolute):
entries = []
for entry in os.scandir(absolute):
if not entry.name.encode('utf-8', 'ignore').decode('utf-8'):
continue
if entry.is_symlink():
continue
is_dir = entry.is_dir()
new_relative = f"{relative}{entry.name}/" if is_dir else f"{relative}{entry.name}"
if is_dir:
lower_name = entry.name.lower()
if not relative:
if lower_name in ["library", "logs", "obj", "temp"] or lower_name.startswith("library"):
continue
if lower_name == ".git":
continue
timestamp = entry.stat().st_mtime
entries.append((new_relative, entry, is_dir, timestamp))
children = [
read_dir_to_tree(relative, entry.path) if is_dir else FileSystemTree.new_file(relative, entry.path, timestamp)
for relative, entry, is_dir, timestamp in entries
]
return FileSystemTree.new_dir(relative, absolute, children)
def collect_notable_project_files_tree(path_buf):
return read_dir_to_tree("", path_buf)
def create_backup_zip(backup_timestamp, project_name, project_path, backup_path, compression):
try:
last_timestamp = backup_timestamp.get_last_timestamp(project_name)
file_tree = collect_notable_project_files_tree(project_path)
timestamp = file_tree.max_timestamp()
if last_timestamp is not None and timestamp <= last_timestamp:
return 0
total_files = file_tree.count_all()
with zipfile.ZipFile(backup_path, 'w', compression) as zipf:
with tqdm(total=total_files, desc=f"Creating backup for {project_name}", unit="file") as pbar:
for entry in file_tree.recursive():
if entry.is_dir():
zipf.write(entry.absolute_path, entry.relative_path, compress_type=zipfile.ZIP_STORED)
else:
zipf.write(entry.absolute_path, entry.relative_path, compress_type=compression)
pbar.update(1)
pbar.close()
zipf.close()
backup_timestamp.set_last_timestamp(project_name, timestamp)
return 1
except KeyboardInterrupt:
if os.path.exists(backup_path):
os.remove(backup_path)
return -1
def backup_project(backup_timestamp, project_name, project_path, output_path, compression):
lockfile_path = os.path.join(project_path, "Temp", "UnityLockfile")
if os.path.exists(lockfile_path):
tqdm.write(f"[{project_name}] UnityLockfile exists. Waiting for it to be removed.")
wait_for_lockfile(lockfile_path)
result = create_backup_zip(backup_timestamp, project_name, project_path, output_path, compression)
return project_name, result
def wait_for_lockfile(lockfile_path):
while os.path.exists(lockfile_path):
time.sleep(1)
def main():
project_base_path = "N:\\Game\\VRChatProjects"
backup_base_path = "N:\\Game\\VRChatProjectBackups"
config_file = "N:\\Game\\VRChatProjectBackupTool\\backup_config.json"
compression = zipfile.ZIP_BZIP2
backup_config = BackupConfig(project_base_path, backup_base_path, config_file)
backup_projects = backup_config.get_target_projects()
backup_timestamp = BackupLastTimestamp("backup_last_timestamp.json")
if not backup_projects:
print("No projects to backup. Please set target projects in the config file.")
return
total_projects = len(backup_projects)
with tqdm(total=total_projects, desc="Backing up projects", unit="project") as pbar:
with ProcessPoolExecutor() as executor:
futures = []
for project_name in backup_projects:
backup_datetime_str = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
project_path = os.path.join(project_base_path, project_name)
output_path = os.path.join(backup_base_path, f"{project_name}-{backup_datetime_str}.zip")
futures.append(executor.submit(backup_project, backup_timestamp, project_name, project_path, output_path, compression))
for future in as_completed(futures):
project_name, result = future.result()
if result == 1:
tqdm.write(f"[{project_name}] Backup completed")
elif result == 0:
tqdm.write(f"[{project_name}] No changes detected. Skipping backup")
else:
tqdm.write(f"[{project_name}] Backup cancelled")
pbar.update(1)
print("Backup process completed.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment