Created
September 1, 2025 08:46
-
-
Save hclivess/d15faee93143ed380be65b347dcef4a9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
CheckMK Custom Content Backup & Restore Script | |
============================================== | |
Backs up and restores CheckMK custom configurations, plugins, and scripts. | |
Excludes official CheckMK files to focus on custom content only. | |
""" | |
import os | |
import sys | |
import shutil | |
import tarfile | |
import json | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List, Dict, Optional, Tuple | |
import argparse | |
import logging | |
class CMKBackup: | |
def __init__(self, backup_dir: str = "/tmp/cmk_backups", compress: bool = True, verbose: bool = False): | |
""" | |
Initialize the CheckMK backup and restore utility. | |
Args: | |
backup_dir: Directory to store backups | |
compress: Whether to compress the backup archive | |
verbose: Enable verbose logging | |
""" | |
self.backup_dir = Path(backup_dir) | |
self.compress = compress | |
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.backup_name = f"cmk_custom_backup_{self.timestamp}" | |
# Create backup directory if it doesn't exist | |
self.backup_dir.mkdir(parents=True, exist_ok=True) | |
# Setup logging | |
log_level = logging.DEBUG if verbose else logging.INFO | |
logging.basicConfig( | |
level=log_level, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
self.logger = logging.getLogger(__name__) | |
def get_cmk_sites(self) -> List[str]: | |
"""Discover CheckMK sites on the system.""" | |
sites = [] | |
omd_root = Path("/omd/sites") | |
if omd_root.exists(): | |
for site_dir in omd_root.iterdir(): | |
if site_dir.is_dir() and (site_dir / "etc/check_mk").exists(): | |
sites.append(site_dir.name) | |
self.logger.info(f"Found CheckMK sites: {sites}") | |
return sites | |
def get_backup_paths(self, sites: List[str]) -> Dict[str, List[str]]: | |
""" | |
Generate the paths to backup for each site. | |
Args: | |
sites: List of CheckMK site names | |
Returns: | |
Dictionary mapping categories to lists of paths | |
""" | |
backup_paths = { | |
"agent_based_plugins": [], | |
"bakery_plugins": [], | |
"web_metrics": [], | |
"web_wato": [], | |
"checkman": [], | |
"legacy_checks": [], | |
"agent_plugins_linux": [], | |
"agent_plugins_windows": [], | |
"custom_agents": [], | |
"snmpwalks": [], | |
"simulations": [], | |
"wato_config": [], | |
"unit_definitions": [], | |
"mibs": [], | |
"compiled_mibs": [] | |
} | |
for site in sites: | |
site_root = f"/omd/sites/{site}" | |
# Agent-based plugins (modern CheckMK) | |
backup_paths["agent_based_plugins"].extend([ | |
f"{site_root}/local/lib/python3/cmk/base/plugins/agent_based/", | |
]) | |
# Bakery plugins (for agent packaging) | |
backup_paths["bakery_plugins"].extend([ | |
f"{site_root}/local/lib/python3/cmk/base/cee/plugins/bakery/", | |
]) | |
# Web interface plugins | |
backup_paths["web_metrics"].extend([ | |
f"{site_root}/local/share/check_mk/web/plugins/metrics/", | |
]) | |
backup_paths["web_wato"].extend([ | |
f"{site_root}/local/share/check_mk/web/plugins/wato/", | |
]) | |
# Check documentation | |
backup_paths["checkman"].extend([ | |
f"{site_root}/local/share/check_mk/checkman/", | |
]) | |
# Legacy check plugins | |
backup_paths["legacy_checks"].extend([ | |
f"{site_root}/local/share/check_mk/checks/", | |
]) | |
# Agent plugins | |
backup_paths["agent_plugins_linux"].extend([ | |
f"{site_root}/local/share/check_mk/agents/", | |
f"{site_root}/local/share/check_mk/agents/plugins/", | |
]) | |
backup_paths["agent_plugins_windows"].extend([ | |
f"{site_root}/local/share/check_mk/agents/windows/plugins/", | |
]) | |
# Custom agent implementations | |
backup_paths["custom_agents"].extend([ | |
f"{site_root}/local/share/check_mk/agents/custom/", | |
]) | |
# SNMP walks and simulations | |
backup_paths["snmpwalks"].extend([ | |
f"{site_root}/var/check_mk/snmpwalks/", | |
]) | |
backup_paths["simulations"].extend([ | |
f"{site_root}/var/check_mk/simulation/", | |
]) | |
# WATO configuration | |
backup_paths["wato_config"].extend([ | |
f"{site_root}/etc/check_mk/conf.d/wato/", | |
]) | |
# Unit definitions (if they exist as custom modifications) | |
unit_file = f"{site_root}/lib/check_mk/gui/plugins/metrics/unit.py" | |
if os.path.exists(unit_file): | |
backup_paths["unit_definitions"].append(unit_file) | |
# MIB files (SNMP Management Information Bases) | |
# Custom MIB files for SNMP device monitoring | |
backup_paths["mibs"].extend([ | |
f"{site_root}/local/share/check_mk/mibs/", # CheckMK MIB directory | |
f"{site_root}/local/share/snmp/mibs/", # Alternative MIB location | |
f"{site_root}/share/check_mk/mibs/", # Sometimes custom MIBs go here | |
]) | |
# Compiled MIBs and SNMP cache (contains processed MIB data) | |
backup_paths["compiled_mibs"].extend([ | |
f"{site_root}/tmp/check_mk/snmp_scan_cache/", # SNMP scan cache | |
f"{site_root}/var/check_mk/snmp_cache/", # SNMP result cache | |
f"{site_root}/var/check_mk/counters/", # SNMP counter cache | |
]) | |
return backup_paths | |
def create_manifest(self, backup_paths: Dict[str, List[str]], | |
backup_root: Path) -> None: | |
"""Create a manifest file describing the backup contents.""" | |
manifest = { | |
"backup_timestamp": self.timestamp, | |
"backup_date": datetime.now().isoformat(), | |
"hostname": os.uname().nodename, | |
"backup_categories": {}, | |
"total_files": 0, | |
"total_size_bytes": 0 | |
} | |
for category, paths in backup_paths.items(): | |
manifest["backup_categories"][category] = { | |
"paths": paths, | |
"files_backed_up": [], | |
"size_bytes": 0 | |
} | |
category_dir = backup_root / category | |
if category_dir.exists(): | |
for root, dirs, files in os.walk(category_dir): | |
for file in files: | |
file_path = Path(root) / file | |
relative_path = file_path.relative_to(category_dir) | |
file_size = file_path.stat().st_size | |
manifest["backup_categories"][category]["files_backed_up"].append({ | |
"path": str(relative_path), | |
"size_bytes": file_size, | |
"modified": datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() | |
}) | |
manifest["backup_categories"][category]["size_bytes"] += file_size | |
manifest["total_files"] += 1 | |
manifest["total_size_bytes"] += file_size | |
# Write manifest | |
manifest_path = backup_root / "backup_manifest.json" | |
with open(manifest_path, 'w') as f: | |
json.dump(manifest, f, indent=2) | |
self.logger.info(f"Backup manifest created: {manifest_path}") | |
def backup_paths(self, paths: List[str], destination: Path) -> int: | |
""" | |
Backup a list of paths to a destination directory. | |
Args: | |
paths: List of source paths to backup | |
destination: Destination directory | |
Returns: | |
Number of files backed up | |
""" | |
files_count = 0 | |
destination.mkdir(parents=True, exist_ok=True) | |
for path in paths: | |
source_path = Path(path) | |
if not source_path.exists(): | |
self.logger.warning(f"Path does not exist: {path}") | |
continue | |
if source_path.is_file(): | |
# Single file | |
dest_file = destination / source_path.name | |
try: | |
shutil.copy2(source_path, dest_file) | |
files_count += 1 | |
self.logger.debug(f"Backed up file: {path}") | |
except Exception as e: | |
self.logger.error(f"Failed to backup file {path}: {e}") | |
elif source_path.is_dir(): | |
# Directory - preserve structure | |
try: | |
# Create relative path structure | |
for root, dirs, files in os.walk(source_path): | |
root_path = Path(root) | |
rel_path = root_path.relative_to(source_path.parent) | |
dest_dir = destination / rel_path | |
dest_dir.mkdir(parents=True, exist_ok=True) | |
for file in files: | |
src_file = root_path / file | |
dst_file = dest_dir / file | |
shutil.copy2(src_file, dst_file) | |
files_count += 1 | |
self.logger.debug(f"Backed up directory: {path} ({files_count} files)") | |
except Exception as e: | |
self.logger.error(f"Failed to backup directory {path}: {e}") | |
return files_count | |
def create_archive(self, source_dir: Path, archive_path: Path) -> bool: | |
"""Create a compressed archive of the backup.""" | |
try: | |
mode = "w:gz" if self.compress else "w" | |
with tarfile.open(archive_path, mode) as tar: | |
tar.add(source_dir, arcname=self.backup_name, recursive=True) | |
self.logger.info(f"Archive created: {archive_path}") | |
return True | |
except Exception as e: | |
self.logger.error(f"Failed to create archive: {e}") | |
return False | |
def list_available_backups(self) -> List[Dict]: | |
"""List all available backups in the backup directory.""" | |
backups = [] | |
for item in self.backup_dir.iterdir(): | |
backup_info = {"path": item, "type": None, "timestamp": None, "size": 0} | |
if item.is_file() and (item.suffix == '.gz' or item.suffix == '.tar'): | |
# Archive backup | |
backup_info["type"] = "archive" | |
backup_info["size"] = item.stat().st_size | |
# Extract timestamp from filename | |
name_parts = item.stem.replace('.tar', '').split('_') | |
if len(name_parts) >= 4: | |
try: | |
timestamp_str = f"{name_parts[-2]}_{name_parts[-1]}" | |
backup_info["timestamp"] = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") | |
except ValueError: | |
pass | |
elif item.is_dir() and item.name.startswith("cmk_custom_backup_"): | |
# Directory backup | |
backup_info["type"] = "directory" | |
# Calculate directory size | |
total_size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file()) | |
backup_info["size"] = total_size | |
# Extract timestamp from dirname | |
name_parts = item.name.split('_') | |
if len(name_parts) >= 4: | |
try: | |
timestamp_str = f"{name_parts[-2]}_{name_parts[-1]}" | |
backup_info["timestamp"] = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") | |
except ValueError: | |
pass | |
if backup_info["type"]: | |
backups.append(backup_info) | |
# Sort by timestamp, newest first | |
backups.sort(key=lambda x: x["timestamp"] or datetime.min, reverse=True) | |
return backups | |
def extract_backup(self, backup_path: Path, extract_to: Optional[Path] = None) -> Optional[Path]: | |
""" | |
Extract a backup archive to a temporary or specified directory. | |
Args: | |
backup_path: Path to backup archive | |
extract_to: Directory to extract to (None for temporary) | |
Returns: | |
Path to extracted content or None if failed | |
""" | |
if extract_to is None: | |
extract_to = Path(tempfile.mkdtemp(prefix="cmk_restore_")) | |
try: | |
if backup_path.suffix in ['.gz', '.tar']: | |
with tarfile.open(backup_path, 'r:*') as tar: | |
tar.extractall(extract_to) | |
# Find the backup directory (should be the only directory) | |
extracted_dirs = [d for d in extract_to.iterdir() if d.is_dir()] | |
if extracted_dirs: | |
return extracted_dirs[0] | |
else: | |
return extract_to | |
elif backup_path.is_dir(): | |
# It's already a directory, just return it | |
return backup_path | |
except Exception as e: | |
self.logger.error(f"Failed to extract backup: {e}") | |
return None | |
def load_backup_manifest(self, backup_dir: Path) -> Optional[Dict]: | |
"""Load the backup manifest file.""" | |
manifest_path = backup_dir / "backup_manifest.json" | |
if not manifest_path.exists(): | |
self.logger.warning("No backup manifest found") | |
return None | |
try: | |
with open(manifest_path, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
self.logger.error(f"Failed to load manifest: {e}") | |
return None | |
def validate_restore_environment(self, manifest: Dict, target_sites: Optional[List[str]] = None) -> Tuple[bool, List[str]]: | |
""" | |
Validate that the restore environment is compatible. | |
Args: | |
manifest: Backup manifest | |
target_sites: Sites to restore to (None for auto-detect) | |
Returns: | |
Tuple of (is_valid, list_of_issues) | |
""" | |
issues = [] | |
current_sites = self.get_cmk_sites() | |
if not current_sites: | |
issues.append("No CheckMK sites found on this system") | |
return False, issues | |
if target_sites: | |
missing_sites = set(target_sites) - set(current_sites) | |
if missing_sites: | |
issues.append(f"Target sites not found: {missing_sites}") | |
# Check if we have write permissions to target directories | |
test_sites = target_sites if target_sites else current_sites[:1] # Test at least one site | |
for site in test_sites: | |
site_root = Path(f"/omd/sites/{site}") | |
test_paths = [ | |
site_root / "local/lib/python3/cmk/base/plugins/agent_based", | |
site_root / "local/share/check_mk/web/plugins", | |
site_root / "local/share/check_mk/checks" | |
] | |
for test_path in test_paths: | |
if test_path.exists() and not os.access(test_path, os.W_OK): | |
issues.append(f"No write permission to {test_path}") | |
return len(issues) == 0, issues | |
def create_restore_backup(self, target_paths: List[str]) -> Optional[Path]: | |
"""Create a safety backup before restore.""" | |
safety_backup_name = f"cmk_pre_restore_backup_{self.timestamp}" | |
safety_backup_dir = self.backup_dir / safety_backup_name | |
try: | |
safety_backup_dir.mkdir(parents=True, exist_ok=True) | |
files_backed_up = 0 | |
for path_str in target_paths: | |
path = Path(path_str) | |
if path.exists(): | |
if path.is_file(): | |
dest = safety_backup_dir / path.name | |
shutil.copy2(path, dest) | |
files_backed_up += 1 | |
elif path.is_dir(): | |
dest = safety_backup_dir / path.name | |
shutil.copytree(path, dest, dirs_exist_ok=True) | |
files_backed_up += sum(1 for _ in dest.rglob('*') if _.is_file()) | |
if files_backed_up > 0: | |
self.logger.info(f"Safety backup created: {safety_backup_dir} ({files_backed_up} files)") | |
return safety_backup_dir | |
else: | |
# Remove empty backup directory | |
safety_backup_dir.rmdir() | |
return None | |
except Exception as e: | |
self.logger.error(f"Failed to create safety backup: {e}") | |
return None | |
def restore_category(self, backup_dir: Path, category: str, target_paths: List[str], | |
dry_run: bool = False, overwrite: bool = False) -> Tuple[int, int]: | |
""" | |
Restore files from a specific backup category. | |
Args: | |
backup_dir: Directory containing backup | |
category: Category to restore | |
target_paths: List of target paths for this category | |
dry_run: Only show what would be restored | |
overwrite: Overwrite existing files | |
Returns: | |
Tuple of (files_restored, files_skipped) | |
""" | |
category_dir = backup_dir / category | |
if not category_dir.exists(): | |
return 0, 0 | |
files_restored = 0 | |
files_skipped = 0 | |
for target_path_str in target_paths: | |
target_root = Path(target_path_str) | |
if not target_root.exists() and not dry_run: | |
target_root.mkdir(parents=True, exist_ok=True) | |
self.logger.info(f"Created directory: {target_root}") | |
# Walk through backup files | |
for backup_file in category_dir.rglob('*'): | |
if not backup_file.is_file(): | |
continue | |
# Calculate relative path from category dir | |
rel_path = backup_file.relative_to(category_dir) | |
target_file = target_root / rel_path | |
# Check if we should skip this file | |
if target_file.exists() and not overwrite: | |
self.logger.debug(f"Skipping existing file: {target_file}") | |
files_skipped += 1 | |
continue | |
if dry_run: | |
action = "RESTORE" if not target_file.exists() or overwrite else "SKIP" | |
self.logger.info(f"[DRY RUN] {action}: {backup_file} -> {target_file}") | |
if action == "RESTORE": | |
files_restored += 1 | |
else: | |
files_skipped += 1 | |
else: | |
try: | |
# Create parent directories if needed | |
target_file.parent.mkdir(parents=True, exist_ok=True) | |
# Copy file | |
shutil.copy2(backup_file, target_file) | |
files_restored += 1 | |
self.logger.debug(f"Restored: {backup_file} -> {target_file}") | |
except Exception as e: | |
self.logger.error(f"Failed to restore {backup_file}: {e}") | |
files_skipped += 1 | |
return files_restored, files_skipped | |
def run_restore(self, backup_path: str, target_sites: Optional[List[str]] = None, | |
categories: Optional[List[str]] = None, dry_run: bool = False, | |
overwrite: bool = False, create_safety_backup: bool = True) -> bool: | |
""" | |
Run the complete restore process. | |
Args: | |
backup_path: Path to backup archive or directory | |
target_sites: Sites to restore to (None for all available sites) | |
categories: Categories to restore (None for all) | |
dry_run: Only show what would be restored | |
overwrite: Overwrite existing files | |
create_safety_backup: Create safety backup before restore | |
Returns: | |
True if restore was successful | |
""" | |
try: | |
backup_path_obj = Path(backup_path) | |
if not backup_path_obj.exists(): | |
self.logger.error(f"Backup not found: {backup_path}") | |
return False | |
self.logger.info(f"Starting restore from: {backup_path}") | |
# Extract backup if it's an archive | |
if backup_path_obj.is_file(): | |
self.logger.info("Extracting backup archive...") | |
extracted_dir = self.extract_backup(backup_path_obj) | |
if not extracted_dir: | |
self.logger.error("Failed to extract backup") | |
return False | |
else: | |
extracted_dir = backup_path_obj | |
# Load manifest | |
manifest = self.load_backup_manifest(extracted_dir) | |
if not manifest: | |
self.logger.warning("Proceeding without manifest") | |
manifest = {"backup_categories": {}} | |
# Determine target sites | |
if target_sites is None: | |
target_sites = self.get_cmk_sites() | |
if not target_sites: | |
self.logger.error("No target sites available") | |
return False | |
# Validate environment | |
is_valid, issues = self.validate_restore_environment(manifest, target_sites) | |
if not is_valid: | |
self.logger.error("Restore environment validation failed:") | |
for issue in issues: | |
self.logger.error(f" - {issue}") | |
return False | |
# Get restore paths | |
restore_paths = self.get_backup_paths(target_sites) | |
# Filter categories if specified | |
if categories: | |
restore_paths = {k: v for k, v in restore_paths.items() if k in categories} | |
# Create safety backup if requested | |
if create_safety_backup and not dry_run: | |
all_target_paths = [] | |
for paths in restore_paths.values(): | |
all_target_paths.extend(paths) | |
self.create_restore_backup(all_target_paths) | |
# Perform restore | |
total_restored = 0 | |
total_skipped = 0 | |
for category, target_paths in restore_paths.items(): | |
if not target_paths: | |
continue | |
self.logger.info(f"Restoring category: {category}") | |
restored, skipped = self.restore_category( | |
extracted_dir, category, target_paths, dry_run, overwrite | |
) | |
total_restored += restored | |
total_skipped += skipped | |
if restored > 0 or skipped > 0: | |
self.logger.info(f" → {restored} files restored, {skipped} files skipped") | |
# Cleanup temporary extraction if needed | |
if backup_path_obj.is_file() and extracted_dir != backup_path_obj: | |
try: | |
shutil.rmtree(extracted_dir) | |
except: | |
pass # Ignore cleanup errors | |
if dry_run: | |
self.logger.info(f"[DRY RUN] Would restore {total_restored} files, skip {total_skipped} files") | |
else: | |
self.logger.info(f"Restore completed: {total_restored} files restored, {total_skipped} files skipped") | |
if total_restored > 0: | |
self.logger.info("Remember to restart CheckMK services: omd restart") | |
return True | |
except Exception as e: | |
self.logger.error(f"Restore failed: {e}") | |
return False | |
def run_backup(self, sites: Optional[List[str]] = None, | |
create_archive: bool = True) -> bool: | |
""" | |
Run the complete backup process. | |
Args: | |
sites: Specific sites to backup (None for all sites) | |
create_archive: Whether to create a compressed archive | |
Returns: | |
True if backup was successful | |
""" | |
try: | |
self.logger.info(f"Starting CheckMK custom content backup - {self.timestamp}") | |
# Discover sites if not specified | |
if sites is None: | |
sites = self.get_cmk_sites() | |
if not sites: | |
self.logger.error("No CheckMK sites found!") | |
return False | |
# Get backup paths | |
backup_paths = self.get_backup_paths(sites) | |
# Create working directory | |
working_dir = self.backup_dir / self.backup_name | |
working_dir.mkdir(parents=True, exist_ok=True) | |
total_files = 0 | |
# Backup each category | |
for category, paths in backup_paths.items(): | |
if not paths: | |
continue | |
self.logger.info(f"Backing up {category}...") | |
category_dir = working_dir / category | |
files_count = self.backup_paths(paths, category_dir) | |
total_files += files_count | |
if files_count > 0: | |
self.logger.info(f" → {files_count} files backed up") | |
else: | |
self.logger.info(f" → No files found") | |
# Create manifest | |
self.create_manifest(backup_paths, working_dir) | |
# Create archive if requested | |
if create_archive: | |
archive_name = f"{self.backup_name}.tar.gz" if self.compress else f"{self.backup_name}.tar" | |
archive_path = self.backup_dir / archive_name | |
if self.create_archive(working_dir, archive_path): | |
# Remove working directory after successful archive creation | |
shutil.rmtree(working_dir) | |
self.logger.info(f"Backup completed successfully: {archive_path}") | |
self.logger.info(f"Total files backed up: {total_files}") | |
else: | |
self.logger.error("Archive creation failed, keeping working directory") | |
return False | |
else: | |
self.logger.info(f"Backup completed successfully: {working_dir}") | |
self.logger.info(f"Total files backed up: {total_files}") | |
return True | |
except Exception as e: | |
self.logger.error(f"Backup failed: {e}") | |
return False | |
def main(): | |
"""Main function with command line interface.""" | |
parser = argparse.ArgumentParser( | |
description="Backup and restore CheckMK custom content (plugins, configurations, etc.)", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
# Create backup | |
%(prog)s backup | |
# Create backup of specific sites | |
%(prog)s backup --sites master DRP | |
# List available backups | |
%(prog)s list | |
# Restore from latest backup (dry run) | |
%(prog)s restore --dry-run | |
# Restore specific backup | |
%(prog)s restore /tmp/cmk_backups/cmk_custom_backup_20241201_143022.tar.gz | |
# Restore only specific categories | |
%(prog)s restore --categories agent_based_plugins web_metrics | |
# Restore to specific sites with overwrite | |
%(prog)s restore --sites master --overwrite /path/to/backup | |
""" | |
) | |
subparsers = parser.add_subparsers(dest='command', help='Available commands') | |
# Backup command | |
backup_parser = subparsers.add_parser('backup', help='Create a backup') | |
backup_parser.add_argument( | |
"--backup-dir", "-d", | |
default="/tmp/cmk_backups", | |
help="Directory to store backups (default: /tmp/cmk_backups)" | |
) | |
backup_parser.add_argument( | |
"--sites", "-s", | |
nargs="+", | |
help="Specific sites to backup (default: all sites)" | |
) | |
backup_parser.add_argument( | |
"--no-compress", "-n", | |
action="store_true", | |
help="Don't compress the backup archive" | |
) | |
backup_parser.add_argument( | |
"--no-archive", "-a", | |
action="store_true", | |
help="Don't create archive, keep as directory structure" | |
) | |
# List command | |
list_parser = subparsers.add_parser('list', help='List available backups') | |
list_parser.add_argument( | |
"--backup-dir", "-d", | |
default="/tmp/cmk_backups", | |
help="Directory containing backups (default: /tmp/cmk_backups)" | |
) | |
# Restore command | |
restore_parser = subparsers.add_parser('restore', help='Restore from backup') | |
restore_parser.add_argument( | |
"backup_path", | |
nargs="?", | |
help="Path to backup file/directory (default: latest backup)" | |
) | |
restore_parser.add_argument( | |
"--backup-dir", "-d", | |
default="/tmp/cmk_backups", | |
help="Directory containing backups (default: /tmp/cmk_backups)" | |
) | |
restore_parser.add_argument( | |
"--sites", "-s", | |
nargs="+", | |
help="Sites to restore to (default: all available sites)" | |
) | |
restore_parser.add_argument( | |
"--categories", "-c", | |
nargs="+", | |
choices=["agent_based_plugins", "bakery_plugins", "web_metrics", "web_wato", | |
"checkman", "legacy_checks", "agent_plugins_linux", "agent_plugins_windows", | |
"custom_agents", "snmpwalks", "simulations", "wato_config", "unit_definitions", | |
"mibs", "compiled_mibs"], | |
help="Categories to restore (default: all categories)" | |
) | |
restore_parser.add_argument( | |
"--dry-run", "-n", | |
action="store_true", | |
help="Show what would be restored without actually doing it" | |
) | |
restore_parser.add_argument( | |
"--overwrite", "-f", | |
action="store_true", | |
help="Overwrite existing files" | |
) | |
restore_parser.add_argument( | |
"--no-safety-backup", | |
action="store_true", | |
help="Don't create safety backup before restore" | |
) | |
# Global options | |
parser.add_argument( | |
"--verbose", "-v", | |
action="store_true", | |
help="Enable verbose logging" | |
) | |
args = parser.parse_args() | |
if not args.command: | |
parser.print_help() | |
return 1 | |
# Create backup/restore instance | |
cmk = CMKBackup( | |
backup_dir=args.backup_dir, | |
compress=not getattr(args, 'no_compress', False), | |
verbose=args.verbose | |
) | |
if args.command == 'backup': | |
# Run backup | |
success = cmk.run_backup( | |
sites=args.sites, | |
create_archive=not args.no_archive | |
) | |
return 0 if success else 1 | |
elif args.command == 'list': | |
# List available backups | |
backups = cmk.list_available_backups() | |
if not backups: | |
print("No backups found in", args.backup_dir) | |
return 0 | |
print(f"\nAvailable backups in {args.backup_dir}:") | |
print("-" * 80) | |
print(f"{'Backup Name':<35} {'Type':<10} {'Date':<20} {'Size':<10}") | |
print("-" * 80) | |
for backup in backups: | |
name = backup["path"].name | |
backup_type = backup["type"] | |
date_str = backup["timestamp"].strftime("%Y-%m-%d %H:%M:%S") if backup["timestamp"] else "Unknown" | |
# Format size | |
size = backup["size"] | |
if size > 1024*1024*1024: | |
size_str = f"{size/(1024*1024*1024):.1f}GB" | |
elif size > 1024*1024: | |
size_str = f"{size/(1024*1024):.1f}MB" | |
elif size > 1024: | |
size_str = f"{size/1024:.1f}KB" | |
else: | |
size_str = f"{size}B" | |
print(f"{name:<35} {backup_type:<10} {date_str:<20} {size_str:<10}") | |
return 0 | |
elif args.command == 'restore': | |
# Determine backup path | |
backup_path = args.backup_path | |
if not backup_path: | |
# Use latest backup | |
backups = cmk.list_available_backups() | |
if not backups: | |
print("No backups found. Please specify a backup path.") | |
return 1 | |
backup_path = str(backups[0]["path"]) | |
print(f"Using latest backup: {backup_path}") | |
# Run restore | |
success = cmk.run_restore( | |
backup_path=backup_path, | |
target_sites=args.sites, | |
categories=args.categories, | |
dry_run=args.dry_run, | |
overwrite=args.overwrite, | |
create_safety_backup=not args.no_safety_backup | |
) | |
return 0 if success else 1 | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment