Last active
April 28, 2025 18:21
-
-
Save EricsonWillians/5a5d78268531c413bcf9cad5b9a4d272 to your computer and use it in GitHub Desktop.
Python backup analyzer. Scores file significance in home dirs (stats/heuristics: type, age, keywords, entropy). Generates backup strategies & MD reports to optimize storage/recovery. Req: Python 3, rich, pandas, numpy, sudo.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Backup Significance Analyzer v2.1.0 | |
Analyzes file and directory significance in user home directories using statistical | |
methods (entropy, access patterns, modification frequency, content hints) to | |
recommend optimized backup strategies. | |
Requires: rich, pandas, numpy | |
Usage: | |
sudo python3 backup_significance_analyzer.py [/path/to/scan] | |
Example: | |
sudo python3 backup_significance_analyzer.py /home | |
Author: Backup Intelligence Team (Enhanced by AI) | |
Version: 2.1.0 | |
""" | |
import os | |
import sys | |
import time | |
import math | |
import hashlib | |
import json | |
import subprocess | |
from datetime import datetime | |
from pathlib import Path | |
from collections import Counter, defaultdict | |
from typing import Dict, List, Tuple, Set, Optional, Union, Any, NamedTuple | |
# --- Dependency Handling & Environment Setup --- | |
# Attempt to limit OpenBLAS threads before importing numpy/pandas | |
# This helps prevent 'pthread_create failed' errors on some systems when run with sudo | |
try: | |
num_threads = "2" # Start with a low number | |
os.environ['OPENBLAS_NUM_THREADS'] = num_threads | |
os.environ['MKL_NUM_THREADS'] = num_threads | |
os.environ['OMP_NUM_THREADS'] = num_threads | |
# Can add more environment variables if needed (e.g., for Accelerate framework on macOS) | |
print(f"[INFO] Setting max BLAS threads to {num_threads} to mitigate potential resource issues.") | |
except Exception as e: | |
print(f"[Warning] Could not set BLAS thread limits: {e}") | |
# Check and attempt to install rich | |
try: | |
from rich.console import Console, Group | |
from rich.panel import Panel | |
from rich.progress import (Progress, BarColumn, TextColumn, | |
TimeElapsedColumn, SpinnerColumn, TaskID) | |
from rich.table import Table | |
from rich.text import Text | |
from rich.tree import Tree | |
from rich import box | |
from rich.prompt import Confirm, Prompt | |
from rich.layout import Layout | |
from rich.markdown import Markdown | |
from rich.syntax import Syntax | |
except ImportError: | |
print("Required package 'rich' not found. Attempting to install...") | |
try: | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "rich"]) | |
print("Installation complete. Please restart the script.") | |
sys.exit(0) | |
except Exception as e: | |
print(f"Failed to install 'rich': {e}") | |
print("Please install it manually: pip install rich") | |
sys.exit(1) | |
# Check for pandas and numpy (often problematic with sudo) | |
try: | |
import pandas as pd | |
except ImportError: | |
print("Required package 'pandas' not found.") | |
print("Attempting to install 'pandas'...") | |
try: | |
# Note: Using sudo pip can be risky. Consider system package manager or virtual envs. | |
print("[Warning] Trying 'pip install pandas'. If this fails or you used 'sudo python3', " | |
"you might need 'sudo pip install pandas' or install via your system package manager.") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "pandas"]) | |
print("Installation complete. Please restart the script.") | |
sys.exit(0) | |
except Exception as e: | |
print(f"Failed to install 'pandas': {e}") | |
print("Please install it manually (e.g., 'pip install pandas' or 'sudo pip install pandas').") | |
sys.exit(1) | |
try: | |
import numpy as np | |
except ImportError: | |
print("Required package 'numpy' not found.") | |
print("Attempting to install 'numpy'...") | |
try: | |
print("[Warning] Trying 'pip install numpy'. If this fails or you used 'sudo python3', " | |
"you might need 'sudo pip install numpy' or install via your system package manager.") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "numpy"]) | |
print("Installation complete. Please restart the script.") | |
sys.exit(0) | |
except Exception as e: | |
print(f"Failed to install 'numpy': {e}") | |
print("Please install it manually (e.g., 'pip install numpy' or 'sudo pip install numpy').") | |
sys.exit(1) | |
# --- Configuration Constants --- | |
# Weights for file extensions (0.0 to 1.0) - higher means more important | |
EXTENSION_WEIGHTS: Dict[str, float] = { | |
# Documents & Text | |
'pdf': 0.85, 'doc': 0.83, 'docx': 0.85, 'txt': 0.70, 'odt': 0.82, | |
'rtf': 0.75, 'tex': 0.80, 'md': 0.78, 'log': 0.40, 'csv': 0.80, | |
'json': 0.78, 'xml': 0.76, 'yaml': 0.77, 'yml': 0.77, 'toml': 0.77, | |
'ini': 0.75, 'conf': 0.78, 'config': 0.78, 'env': 0.80, | |
# Spreadsheets & Presentations | |
'xlsx': 0.85, 'xls': 0.84, 'ods': 0.83, | |
'pptx': 0.83, 'ppt': 0.82, 'odp': 0.81, | |
# Code & Development | |
'py': 0.88, 'ipynb': 0.87, 'java': 0.85, 'c': 0.85, 'cpp': 0.85, 'h': 0.80, | |
'js': 0.83, 'html': 0.75, 'css': 0.72, 'sh': 0.82, 'bash': 0.82, 'zsh': 0.80, | |
'rb': 0.83, 'php': 0.83, 'go': 0.85, 'rs': 0.85, 'ts': 0.84, 'sql': 0.84, | |
'swift': 0.85, 'kt': 0.85, 'dart': 0.84, 'lua': 0.82, 'r': 0.83, | |
'pl': 0.82, 'scala': 0.83, 'vb': 0.80, 'ps1': 0.82, 'psm1': 0.82, | |
'dockerfile': 0.75, 'makefile': 0.70, | |
# Databases & Data | |
'db': 0.90, 'sqlite': 0.90, 'sqlite3': 0.90, 'mdb': 0.88, 'accdb': 0.88, | |
'sqlitedb': 0.90, 'bak': 0.80, 'dump': 0.80, | |
# Media (Lower importance generally, can be large) | |
'jpg': 0.67, 'jpeg': 0.67, 'png': 0.68, 'gif': 0.55, 'svg': 0.70, 'ico': 0.40, | |
'tiff': 0.72, 'bmp': 0.60, 'psd': 0.75, 'ai': 0.75, 'eps': 0.72, | |
'raw': 0.78, 'cr2': 0.78, 'nef': 0.78, 'arw': 0.78, 'dng': 0.78, # Camera Raw | |
'mp3': 0.60, 'wav': 0.65, 'flac': 0.72, 'aac': 0.62, 'ogg': 0.63, 'm4a': 0.61, | |
'mp4': 0.75, 'mov': 0.75, 'avi': 0.73, 'mkv': 0.73, 'wmv': 0.70, 'flv': 0.68, | |
'webm': 0.70, 'webp': 0.65, | |
# Archives (Importance depends on content, often medium) | |
'zip': 0.70, 'tar': 0.70, 'gz': 0.70, 'bz2': 0.70, 'xz': 0.70, | |
'rar': 0.70, '7z': 0.70, 'tgz': 0.70, 'iso': 0.50, 'img': 0.60, | |
# Cryptographic & Security (High Importance) | |
'pem': 0.95, 'key': 0.95, 'crt': 0.95, 'csr': 0.90, 'p12': 0.95, 'pfx': 0.95, | |
'gpg': 0.95, 'pgp': 0.95, 'kdbx': 0.95, 'keystore': 0.95, 'jks': 0.95, | |
'ssh': 0.90, # Often config files in .ssh/ | |
# System & Temporary (Low Importance) | |
'tmp': 0.10, 'temp': 0.10, 'swp': 0.05, 'swo': 0.05, 'lock': 0.15, | |
'pid': 0.10, 'cache': 0.05, 'o': 0.20, 'obj': 0.20, 'a': 0.25, 'lib': 0.30, | |
'so': 0.30, 'dll': 0.30, 'dylib': 0.30, 'exe': 0.40, 'app': 0.45, 'msi': 0.35, | |
'sys': 0.25, 'drv': 0.25, | |
# Virtual Machines & Containers (Can be large, importance varies) | |
'vmdk': 0.65, 'vdi': 0.65, 'qcow2': 0.65, 'vhd': 0.65, 'vhdx': 0.65, | |
'ova': 0.70, 'ovf': 0.70, | |
# Misc Office/System | |
'xkb': 0.30, 'desktop': 0.25, 'lnk': 0.20, | |
'ics': 0.75, 'vcf': 0.80, | |
'otf': 0.50, 'ttf': 0.50, # Fonts | |
} | |
# Directory names hinting at important user data | |
IMPORTANT_DIR_NAMES: Set[str] = { | |
'Documents', 'Documents', 'My Documents', 'Docs', 'Texte', 'Dokumente', # Common variations | |
'Pictures', 'Photos', 'Images', 'Bilder', | |
'Music', 'Audio', 'Musik', | |
'Videos', 'Movies', 'Filme', | |
'Desktop', 'Schreibtisch', | |
'Downloads', # Often contains important recent items, despite temp nature | |
'Projects', 'src', 'code', 'dev', 'workspace', 'repos', | |
'work', 'research', 'studies', 'uni', 'thesis', 'dissertation', | |
'backup', 'archive', 'important', 'critical', 'private', 'personal', | |
'.ssh', '.gnupg', '.aws', '.kube', '.docker', '.config', # Config directories | |
'tax', 'finance', 'banking', 'invoices', 'receipts', | |
'cv', 'resume', 'applications', 'contracts', 'legal', 'official', | |
'medical', 'health', | |
'credentials', 'certificates', 'passwords', 'keys', 'secrets', 'identity', | |
'notes', 'notebooks', 'journals', | |
'vm', 'virtualbox', 'virtual machines', | |
'git' # User's main git repos, not .git inside projects | |
} | |
# Directory names/patterns typically containing low-value/regeneratable data | |
# Using path components for matching | |
LOW_VALUE_DIR_PATTERNS: Set[str] = { | |
'node_modules', '__pycache__', '.git', # Specific common ones | |
'venv', 'env', '.venv', '.env', # Virtual environments | |
'tmp', 'temp', 'cache', '.cache', '.pytest_cache', # Caches and temp | |
'logs', 'log', '.logs', # Logs | |
'build', 'dist', 'out', 'target', 'bin', 'obj', # Build artifacts | |
'.Trash', '.local/share/Trash', # Trash bins | |
'.npm', '.yarn', '.pnpm-store', '.gradle', '.m2', '.nuget', # Package manager caches | |
'.cargo', '.rustup', '.pub-cache', '.composer', # More package caches | |
'bower_components', '.history', '.vscode', '.idea', '.settings', # IDE/Editor specific | |
'vendor', 'Pods', # Dependency dirs | |
'site-packages', 'jspm_packages', # Language package dirs | |
'coverage', '.coverage' # Test coverage reports | |
} | |
# Keywords in filenames hinting at important content | |
IMPORTANT_KEYWORDS: Set[str] = { | |
'password', 'secret', 'key', 'credential', 'token', 'api_key', 'private_key', | |
'id', 'identity', 'certificate', 'license', 'contract', 'agreement', | |
'tax', 'ssn', 'passport', 'driver_license', 'personal_id', | |
'confidential', 'private', 'sensitive', 'classified', | |
'personal', 'financial', 'banking', 'loan', 'mortgage', 'investment', | |
'medical', 'health', 'diagnosis', 'prescription', 'insurance', | |
'legal', 'official', 'affidavit', 'subpoena', 'testament', 'will', | |
'exam', 'thesis', 'dissertation', 'proposal', 'manuscript', 'publication', | |
'report', 'analysis', 'strategy', 'plan', 'research', | |
'invoice', 'receipt', 'bill', 'statement', 'payslip', | |
'backup', 'archive', 'export', # Could be important exports | |
'resume', 'cv', 'cover_letter', 'application' | |
} | |
# --- Helper Classes and Functions --- | |
class FileMetadata(NamedTuple): | |
"""Stores metadata for a single file.""" | |
user: str | |
path: Path # Relative path within user's home | |
absolute_path: Path # Full path | |
size: int | |
modified: datetime | |
accessed: datetime | |
extension: str | |
age_days: float | |
age_category: str | |
entropy: float | |
is_unique: bool # Based on sampled hash | |
importance: float | |
hash_sample: Optional[str] # Hash of sampled content | |
has_important_keyword: bool | |
is_likely_text: bool | |
def format_size(size_bytes: Union[int, float]) -> str: | |
"""Formats size in bytes to human-readable format (KB, MB, GB).""" | |
if size_bytes < 1024: | |
return f"{size_bytes} bytes" | |
elif size_bytes < 1024**2: | |
return f"{size_bytes / 1024:.1f} KB" | |
elif size_bytes < 1024**3: | |
return f"{size_bytes / 1024**2:.1f} MB" | |
elif size_bytes < 1024**4: | |
return f"{size_bytes / 1024**3:.2f} GB" | |
else: | |
return f"{size_bytes / 1024**4:.2f} TB" | |
def calculate_entropy(data: bytes) -> float: | |
"""Calculates Shannon entropy of byte data, normalized to 0.0-1.0.""" | |
if not data: | |
return 0.0 | |
entropy = 0.0 | |
data_len = len(data) | |
byte_counts = Counter(data) | |
for count in byte_counts.values(): | |
p_x = count / data_len | |
if p_x > 0: | |
entropy -= p_x * math.log2(p_x) | |
# Normalize: max entropy for bytes is log2(256) = 8 | |
return entropy / 8.0 | |
def is_likely_text_file(sample: bytes, ext: str) -> bool: | |
"""Simple heuristic to guess if a file is text-based.""" | |
text_extensions = { | |
'txt', 'md', 'py', 'js', 'json', 'html', 'css', 'xml', 'yaml', 'yml', | |
'c', 'cpp', 'h', 'java', 'rb', 'php', 'pl', 'sh', 'sql', 'csv', 'log', | |
'tex', 'rtf', 'ini', 'conf', 'config', 'toml', 'go', 'rs', 'ts', 'swift', | |
'kt', 'dart', 'lua', 'r', 'ps1', 'psm1', 'bash', 'zsh', 'ipynb' | |
} | |
if ext in text_extensions: | |
return True | |
# Check content: If mostly printable ASCII and common UTF-8, likely text. | |
# Avoid checking binary files based on extension alone. | |
binary_extensions = {'exe', 'dll', 'so', 'a', 'o', 'lib', 'bin', 'img', 'iso'} | |
if ext in binary_extensions: | |
return False | |
try: | |
# Attempt to decode a sample as UTF-8 | |
sample.decode('utf-8', errors='strict') | |
# High proportion of non-printable chars might indicate binary | |
non_printable = sum(1 for byte in sample if byte < 32 and byte not in (9, 10, 13)) # Allow tab, lf, cr | |
if non_printable / len(sample) > 0.1: # Arbitrary threshold: >10% non-printable? | |
return False | |
return True | |
except UnicodeDecodeError: | |
return False | |
except Exception: # Other potential issues | |
return False | |
# --- Core Analyzer Class --- | |
class FileSignificanceAnalyzer: | |
"""Core engine for scanning files and calculating backup significance.""" | |
# Sampling config for hashing and entropy | |
HASH_SAMPLE_SIZE_START = 65536 # 64 KB from start | |
HASH_SAMPLE_SIZE_END = 65536 # 64 KB from end (if file is large enough) | |
MAX_SIZE_FOR_FULL_HASH = 1 * 1024 * 1024 # 1 MB | |
MAX_SIZE_FOR_ENTROPY_CALC = 10 * 1024 * 1024 # 10 MB | |
def __init__(self, base_path: str = "/home", scan_depth: int = 10, entropy_threshold: float = 0.65): | |
""" | |
Initializes the analyzer. | |
Args: | |
base_path: The base directory to scan (e.g., "/home"). | |
scan_depth: Maximum directory depth to traverse relative to each user's home. | |
entropy_threshold: Minimum normalized entropy score to be considered 'interesting'. | |
""" | |
self.base_path = Path(base_path).resolve() # Ensure absolute path | |
self.scan_depth = scan_depth | |
self.entropy_threshold = entropy_threshold | |
self.file_metadata_list: List[FileMetadata] = [] | |
self.user_stats: Dict[str, Dict[str, Any]] = {} | |
self.scan_errors: Dict[str, List[str]] = defaultdict(list) | |
self.total_size_scanned: int = 0 | |
self.total_files_scanned: int = 0 | |
self.content_hashes: Set[str] = set() # Stores hashes of file content samples | |
self.console = Console(stderr=True) # Use stderr for progress/errors | |
def _log_error(self, user: str, path: Union[Path, str], error: Exception): | |
"""Logs an error encountered during scanning.""" | |
error_msg = f"Error processing '{path}': {type(error).__name__}: {error}" | |
self.scan_errors[user].append(error_msg) | |
# Optionally print verbose errors immediately | |
# self.console.print(f"[dim red] স্ক্যান ত্রুটি ({user}): {error_msg}[/dim red]") | |
def _calculate_importance( | |
self, | |
file_info: FileMetadata | |
) -> float: | |
""" | |
Calculates a file's importance score (0.0 to 1.0) based on multiple factors. | |
Factors considered: | |
- File extension weight | |
- Directory path importance (is it in IMPORTANT_DIR_NAMES?) | |
- File age (recency) | |
- File size (moderate boost for non-tiny files) | |
- Content entropy (higher entropy can indicate unique data like archives, crypto) | |
- Presence of keywords in filename | |
Args: | |
file_info: The FileMetadata object for the file. | |
Returns: | |
A float score between 0.0 and 1.0. | |
""" | |
ext = file_info.extension | |
path = file_info.path # Relative path | |
stats_size = file_info.size | |
age_days = file_info.age_days | |
entropy = file_info.entropy | |
has_keyword = file_info.has_important_keyword | |
# 1. Base importance from file extension | |
# Default to 0.4 if extension unknown, 0.1 if clearly temp/cache ext | |
ext_importance = EXTENSION_WEIGHTS.get(ext, 0.4) | |
if ext in {'tmp', 'temp', 'cache', 'swp', 'swo', 'lock', 'pid', 'o', 'obj'}: | |
ext_importance = 0.1 | |
# 2. Directory path importance | |
dir_importance = 0.5 # Default baseline | |
try: | |
path_parts = set(p.lower() for p in path.parts) # Lowercase for case-insensitive match | |
# Check against lowercase versions of important/low-value names | |
important_dir_names_lower = {d.lower() for d in IMPORTANT_DIR_NAMES} | |
low_value_patterns_lower = {p.lower() for p in LOW_VALUE_DIR_PATTERNS} | |
# Use path components for matching low value patterns | |
is_in_low_value_dir = any(part in low_value_patterns_lower for part in path.parts) | |
if is_in_low_value_dir: | |
dir_importance = 0.1 # Significantly reduce importance | |
elif any(part in important_dir_names_lower for part in path_parts): | |
dir_importance = 0.9 # Significantly increase importance | |
# Check parent directories too for importance context | |
elif len(path.parts) > 1 and path.parts[-2].lower() in important_dir_names_lower: | |
dir_importance = 0.8 # Slightly less boost than direct match | |
except Exception: | |
pass # Ignore errors during path analysis | |
# 3. Recency factor (higher score for more recent files) | |
# Max score 1.0 for < 7 days, decays over 2 years | |
if age_days < 7: | |
recency_factor = 1.0 | |
elif age_days < 730: # Up to 2 years | |
recency_factor = max(0.0, 1.0 - (age_days / 730.0)) | |
else: | |
recency_factor = 0.05 # Very small boost for very old files | |
# 4. Size factor (penalize zero-byte, small boost for >1KB, cap boost) | |
if stats_size == 0: | |
size_factor = 0.0 | |
elif stats_size < 1024: | |
size_factor = 0.2 # Small files get a small base score | |
else: | |
# Logarithmic scale, capping contribution. Max boost around 10MB. | |
size_mb = stats_size / (1024 * 1024) | |
size_factor = min(0.8, 0.3 + 0.5 * math.log10(1.0 + size_mb)) | |
# 5. Entropy factor (boost if entropy is high, suggesting non-standard/compressed data) | |
if entropy > self.entropy_threshold: | |
# Scale boost based on how much entropy exceeds threshold | |
entropy_factor = 0.5 + 0.5 * ((entropy - self.entropy_threshold) / (1.0 - self.entropy_threshold)) | |
else: | |
entropy_factor = 0.4 # Baseline if entropy isn't high | |
# 6. Keyword factor (significant boost if filename contains keywords) | |
keyword_factor = 0.95 if has_keyword else 0.5 | |
# Combine factors with weights (adjust weights as needed) | |
# Weights should sum close to 1.0 if normalized, but here represent relative contribution. | |
# Prioritizing: Directory > Extension > Keyword > Recency > Entropy > Size | |
weighted_importance = ( | |
0.30 * dir_importance + | |
0.25 * ext_importance + | |
0.15 * keyword_factor + | |
0.15 * recency_factor + | |
0.10 * entropy_factor + | |
0.05 * size_factor | |
) | |
# Ensure score is within [0.0, 1.0] | |
final_importance = max(0.0, min(1.0, weighted_importance)) | |
# Special overrides: if in low value dir, cap max importance unless keyword found | |
if dir_importance <= 0.1 and not has_keyword: | |
final_importance = min(final_importance, 0.15) | |
return final_importance | |
def _get_file_content_sample(self, file_path: Path, size: int) -> Tuple[Optional[bytes], Optional[str]]: | |
"""Reads samples from file for hashing and entropy calculation.""" | |
content_sample = b"" | |
full_content = b"" | |
hash_hex = None | |
try: | |
with open(file_path, 'rb') as f: | |
# Read start chunk | |
start_chunk = f.read(self.HASH_SAMPLE_SIZE_START) | |
content_sample += start_chunk | |
if size <= self.MAX_SIZE_FOR_FULL_HASH: | |
# Read the rest if small enough for full hash | |
remaining_chunk = f.read() | |
content_sample += remaining_chunk | |
full_content = content_sample # Store for hashing | |
elif size > self.HASH_SAMPLE_SIZE_START: | |
# If larger, read end chunk as well for sample hash | |
f.seek(max(self.HASH_SAMPLE_SIZE_START, size - self.HASH_SAMPLE_SIZE_END)) | |
end_chunk = f.read(self.HASH_SAMPLE_SIZE_END) | |
content_sample += end_chunk | |
full_content = start_chunk + end_chunk # Use start+end for sample hash | |
# Calculate hash based on the determined content (full or sample) | |
if full_content: | |
hasher = hashlib.md5() | |
hasher.update(full_content) | |
hash_hex = hasher.hexdigest() | |
# Return the sample for entropy (might differ from hashed content if file > MAX_SIZE...) | |
entropy_sample = content_sample[:self.MAX_SIZE_FOR_ENTROPY_CALC] | |
return entropy_sample, hash_hex | |
except (IOError, OSError, PermissionError) as e: | |
self._log_error(file_path.parts[-2], file_path.name, e) # Assuming user is second to last part | |
return None, None | |
def _process_file(self, entry: os.DirEntry, user_dir: Path, user_name: str): | |
"""Extracts metadata and calculates initial metrics for a single file.""" | |
file_path = Path(entry.path) | |
try: | |
stats = entry.stat() # Use stat from scandir directly | |
# Skip if not a file or if size is zero | |
if not entry.is_file(follow_symlinks=False) or stats.st_size == 0: | |
return | |
# Basic metadata | |
size = stats.st_size | |
modified_time = datetime.fromtimestamp(stats.st_mtime) | |
accessed_time = datetime.fromtimestamp(stats.st_atime) | |
relative_path = file_path.relative_to(user_dir) | |
ext = file_path.suffix.lower().lstrip('.') | |
# Age calculation | |
now_ts = time.time() | |
age_days = (now_ts - stats.st_mtime) / (60 * 60 * 24) | |
if age_days < 30: | |
age_category = "Recent (<30d)" | |
elif age_days < 180: | |
age_category = "Medium (30-180d)" | |
else: | |
age_category = "Old (>180d)" | |
# Keyword check in filename | |
filename_lower = file_path.name.lower() | |
has_important_keyword = any(keyword in filename_lower for keyword in IMPORTANT_KEYWORDS) | |
# Content analysis (Entropy, Hashing for uniqueness) | |
entropy = 0.0 | |
hash_sample = None | |
is_unique = False | |
content_sample = None | |
is_likely_text = False # Default | |
if size < self.MAX_SIZE_FOR_ENTROPY_CALC: # Only process smaller files for content | |
content_sample, hash_sample = self._get_file_content_sample(file_path, size) | |
if content_sample: | |
entropy = calculate_entropy(content_sample) | |
is_likely_text = is_likely_text_file(content_sample, ext) # Check if text | |
if hash_sample: | |
# Check uniqueness based on hash sample | |
if hash_sample not in self.content_hashes: | |
is_unique = True | |
self.content_hashes.add(hash_sample) | |
# Create preliminary metadata object (importance calculated later) | |
file_info = FileMetadata( | |
user=user_name, | |
path=relative_path, | |
absolute_path=file_path, | |
size=size, | |
modified=modified_time, | |
accessed=accessed_time, | |
extension=ext, | |
age_days=age_days, | |
age_category=age_category, | |
entropy=entropy, | |
is_unique=is_unique, | |
importance=0.0, # Placeholder | |
hash_sample=hash_sample, | |
has_important_keyword=has_important_keyword, | |
is_likely_text=is_likely_text | |
) | |
# Calculate final importance score | |
importance = self._calculate_importance(file_info) | |
final_file_info = file_info._replace(importance=importance) | |
# Store metadata | |
self.file_metadata_list.append(final_file_info) | |
# Update statistics | |
self.user_stats[user_name]['total_files'] += 1 | |
self.user_stats[user_name]['total_size'] += size | |
self.user_stats[user_name]['extensions'][ext] += 1 | |
self.user_stats[user_name]['age_distribution'][age_category] += 1 | |
if importance >= 0.8: # Use a threshold for 'important' count | |
self.user_stats[user_name]['critical_files'] += 1 | |
elif importance >= 0.6: | |
self.user_stats[user_name]['high_importance_files'] += 1 | |
if age_category == "Recent (<30d)": | |
self.user_stats[user_name]['modified_recently'] += 1 | |
if is_unique: | |
self.user_stats[user_name]['unique_content_samples'] += 1 | |
if is_likely_text: | |
self.user_stats[user_name]['likely_text_files'] += 1 | |
if has_important_keyword: | |
self.user_stats[user_name]['keyword_files'] += 1 | |
# Update global counters | |
self.total_files_scanned += 1 | |
self.total_size_scanned += size | |
except (PermissionError, FileNotFoundError, OSError) as e: | |
self._log_error(user_name, file_path.name, e) | |
except Exception as e: # Catch unexpected errors during processing | |
self._log_error(user_name, file_path.name, f"Unexpected processing error: {e}") | |
def _scan_user_directory(self, user_dir: Path, progress: Progress, task_id: TaskID) -> int: | |
""" | |
Scans a single user's directory iteratively. | |
Args: | |
user_dir: Path object for the user's home directory. | |
progress: Rich Progress object for updating UI. | |
task_id: TaskID for the Rich progress bar associated with this user. | |
Returns: | |
The total number of files processed for this user. | |
""" | |
user_name = user_dir.name | |
files_processed_count = 0 | |
dirs_to_scan: List[Tuple[Path, int]] = [(user_dir, 0)] # Queue: (directory, depth) | |
processed_dirs = 0 | |
while dirs_to_scan: | |
current_dir, current_depth = dirs_to_scan.pop(0) | |
processed_dirs += 1 | |
# Update progress description periodically | |
if processed_dirs % 20 == 0: | |
progress.update(task_id, description=f"Scanning {user_name}: {current_dir.relative_to(user_dir)}", advance=0) | |
# Check depth limit | |
if current_depth > self.scan_depth: | |
continue | |
# Skip low-value directory patterns entirely (performance) | |
if any(part in LOW_VALUE_DIR_PATTERNS for part in current_dir.relative_to(user_dir).parts): | |
continue | |
try: | |
with os.scandir(current_dir) as entries: | |
for entry in entries: | |
entry_path = Path(entry.path) | |
entry_name_lower = entry.name.lower() | |
relative_entry_path = entry_path.relative_to(user_dir) | |
# Skip hidden files/dirs unless explicitly important (e.g., .ssh) | |
# Allow specific hidden config dirs like .config, .aws etc. | |
is_hidden = entry.name.startswith('.') | |
is_important_hidden = entry_name_lower in {'.ssh', '.gnupg', '.aws', '.kube', '.config'} | |
if is_hidden and not is_important_hidden: | |
continue | |
try: | |
if entry.is_dir(follow_symlinks=False): | |
# Check if the directory itself matches low value patterns | |
if entry.name in LOW_VALUE_DIR_PATTERNS: | |
continue | |
# Check if path components match low value patterns | |
if any(part in LOW_VALUE_DIR_PATTERNS for part in relative_entry_path.parts): | |
continue | |
# Add directory to queue if within depth | |
if current_depth + 1 <= self.scan_depth: | |
dirs_to_scan.append((entry_path, current_depth + 1)) | |
self.user_stats[user_name]['directories_scanned'] += 1 | |
elif entry.is_file(follow_symlinks=False): | |
self._process_file(entry, user_dir, user_name) | |
files_processed_count += 1 | |
# Update progress bar more frequently for files | |
if files_processed_count % 100 == 0: | |
progress.update(task_id, advance=100) # Advance progress | |
except (PermissionError, FileNotFoundError, OSError) as e: | |
# Log error for specific entry, but continue scan | |
self._log_error(user_name, entry.name, e) | |
except Exception as e: | |
self._log_error(user_name, entry.name, f"Unexpected entry error: {e}") | |
except (PermissionError, FileNotFoundError, OSError) as e: | |
# Log error for the directory itself, stop scanning this branch | |
self._log_error(user_name, current_dir.name, e) | |
except Exception as e: | |
self._log_error(user_name, current_dir.name, f"Unexpected directory error: {e}") | |
# Final update for the task when user scan is complete | |
progress.update(task_id, completed=files_processed_count, total=files_processed_count, description=f"Finished {user_name}") | |
return files_processed_count | |
def scan_home_directory(self) -> None: | |
"""Scans all user directories under the base path.""" | |
self.console.print(f"[bold]Starting scan under:[/bold] [cyan]{self.base_path}[/cyan]") | |
if not self.base_path.is_dir(): | |
self.console.print(f"[bold red]Error:[/bold red] Path '{self.base_path}' is not a valid directory.") | |
sys.exit(1) | |
try: | |
user_dirs = [d for d in self.base_path.iterdir() if d.is_dir()] | |
except PermissionError: | |
self.console.print("[bold red]Error:[/bold red] Insufficient permissions to list directories in " | |
f"'{self.base_path}'. Please run with 'sudo'.") | |
sys.exit(1) | |
except Exception as e: | |
self.console.print(f"[bold red]Error:[/bold red] Failed to list directories in '{self.base_path}': {e}") | |
sys.exit(1) | |
if not user_dirs: | |
self.console.print(f"[yellow]No user directories found under '{self.base_path}'.[/yellow]") | |
return | |
with Progress( | |
SpinnerColumn(), | |
TextColumn("[progress.description]{task.description}"), | |
BarColumn(bar_width=None, complete_style="green"), | |
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
TextColumn("({task.completed} files)"), | |
TimeElapsedColumn(), | |
console=self.console, # Use the stderr console | |
transient=False # Keep progress visible after completion | |
) as progress: | |
scan_overall_task = progress.add_task("[bold green]Scanning Users...", total=len(user_dirs)) | |
for user_dir in user_dirs: | |
user_name = user_dir.name | |
# Basic check to skip obviously non-user dirs | |
if user_name.startswith('.') or user_name in ('lost+found', 'skel', 'root'): | |
progress.advance(scan_overall_task) | |
continue | |
# Initialize stats for this user | |
self.user_stats[user_name] = { | |
'total_files': 0, 'total_size': 0, 'extensions': Counter(), | |
'age_distribution': defaultdict(int), 'critical_files': 0, | |
'high_importance_files': 0, 'modified_recently': 0, | |
'unique_content_samples': 0, 'directories_scanned': 0, | |
'likely_text_files': 0, 'keyword_files': 0 | |
} | |
# Add a task for this specific user's file scan | |
# Initialize with total=1 to avoid the NoneType error in rich | |
# We will update total later if possible, or just track completion. | |
user_file_task = progress.add_task(f"Scanning {user_name}...", total=1, start=False) | |
try: | |
# Start the task now | |
progress.start_task(user_file_task) | |
# Scan the directory | |
files_count = self._scan_user_directory(user_dir, progress, user_file_task) | |
# Update the task total if we got a count, otherwise mark as finished | |
if files_count > 0: | |
progress.update(user_file_task, total=files_count, completed=files_count) | |
else: | |
progress.update(user_file_task, completed=1, total=1) # Mark as complete even if 0 files | |
except Exception as e: | |
# Catch errors during the user scan setup/call | |
self.console.print(f"[yellow]Warning:[/yellow] Error initiating scan for {user_name}: {e}") | |
self._log_error(user_name, user_dir.name, e) | |
progress.update(user_file_task, description=f"[red]Error scanning {user_name}[/red]", completed=1, total=1) | |
# Advance the overall user scanning task | |
progress.advance(scan_overall_task) | |
self.console.print("[bold green]Scan complete.[/bold green]") | |
def analyze_data(self) -> pd.DataFrame: | |
""" | |
Converts collected file metadata into a Pandas DataFrame and sorts it. | |
Returns: | |
A Pandas DataFrame containing all file metadata, sorted by importance. | |
Returns an empty DataFrame if no data was collected. | |
""" | |
if not self.file_metadata_list: | |
self.console.print("[yellow]Warning:[/yellow] No file metadata collected during scan.") | |
return pd.DataFrame() | |
self.console.print(f"Analyzing {len(self.file_metadata_list):,} collected file records...") | |
try: | |
# Convert list of NamedTuples to DataFrame | |
df = pd.DataFrame(self.file_metadata_list) | |
# Convert types for better analysis (optional but good practice) | |
df['modified'] = pd.to_datetime(df['modified']) | |
df['accessed'] = pd.to_datetime(df['accessed']) | |
df['size'] = pd.to_numeric(df['size']) | |
df['importance'] = pd.to_numeric(df['importance']) | |
df['entropy'] = pd.to_numeric(df['entropy']) | |
df['age_days'] = pd.to_numeric(df['age_days']) | |
# Sort by importance (descending) | |
df = df.sort_values(by='importance', ascending=False).reset_index(drop=True) | |
self.console.print("Analysis complete.") | |
return df | |
except Exception as e: | |
self.console.print(f"[bold red]Error:[/bold red] Failed to create or process DataFrame: {e}") | |
# Provide more debug info if needed | |
import traceback | |
self.console.print("[dim]" + traceback.format_exc() + "[/dim]") | |
return pd.DataFrame() # Return empty on failure | |
def get_backup_recommendations(self, df: pd.DataFrame) -> Dict[str, Any]: | |
""" | |
Generates backup strategy recommendations based on the analyzed data. | |
Args: | |
df: The DataFrame containing analyzed file data. | |
Returns: | |
A dictionary where keys are usernames and values are recommendation details. | |
""" | |
if df.empty: | |
return {} | |
recommendations = {} | |
# Define importance thresholds dynamically or use fixed ones | |
critical_threshold = 0.8 | |
high_threshold = 0.6 | |
medium_threshold = 0.4 | |
for user, user_df in df.groupby('user'): | |
user_stats = self.user_stats.get(user, {}) # Get stats collected during scan | |
total_size = user_stats.get('total_size', 0) | |
total_files = user_stats.get('total_files', 0) | |
# --- Calculate Sizes by Importance --- | |
critical_size = user_df[user_df['importance'] >= critical_threshold]['size'].sum() | |
high_size = user_df[(user_df['importance'] >= high_threshold) & (user_df['importance'] < critical_threshold)]['size'].sum() | |
medium_size = user_df[(user_df['importance'] >= medium_threshold) & (user_df['importance'] < high_threshold)]['size'].sum() | |
low_size = user_df[user_df['importance'] < medium_threshold]['size'].sum() | |
# --- Identify Important Directories --- | |
# Consider directories containing multiple critical/high importance files | |
important_file_paths = user_df[user_df['importance'] >= high_threshold]['path'] | |
dir_importance_counter = Counter() | |
for file_path in important_file_paths: | |
if file_path.parent != Path('.'): # Avoid counting files directly in home | |
dir_importance_counter[str(file_path.parent)] += 1 # Convert Path to str for key | |
# Select directories with a minimum number of important files (e.g., >= 3) | |
min_important_files_in_dir = 3 | |
top_important_dirs = { | |
path: count for path, count in dir_importance_counter.most_common(10) # Top 10 dirs | |
if count >= min_important_files_in_dir | |
} | |
# --- File Type Analysis --- | |
size_by_ext = user_df.groupby('extension')['size'].sum().sort_values(ascending=False) | |
# Filter for meaningful size contributions (e.g., > 1MB) | |
significant_size_by_ext = size_by_ext[size_by_ext > 1024*1024].head(10) | |
# Calculate average importance per extension (for extensions with enough files) | |
min_files_for_avg_imp = 5 | |
avg_importance_by_ext = {} | |
for ext, group in user_df.groupby('extension'): | |
if len(group) >= min_files_for_avg_imp: | |
avg_importance_by_ext[ext] = group['importance'].mean() | |
top_value_extensions = dict(sorted(avg_importance_by_ext.items(), key=lambda item: item[1], reverse=True)[:10]) | |
# --- Change Rate --- | |
recent_changes_count = user_stats.get('modified_recently', 0) | |
recent_change_rate = recent_changes_count / total_files if total_files > 0 else 0 | |
# --- Deduplication Potential --- | |
# High potential if many non-unique samples were found | |
total_samples = len(user_df[user_df['hash_sample'].notna()]) | |
unique_samples = user_stats.get('unique_content_samples', 0) | |
duplicate_ratio = (total_samples - unique_samples) / total_samples if total_samples > 0 else 0 | |
if duplicate_ratio > 0.3: | |
dedup_potential = 'High' | |
elif duplicate_ratio > 0.1: | |
dedup_potential = 'Medium' | |
else: | |
dedup_potential = 'Low' | |
# --- Compression Potential --- | |
# High potential if large proportion of text files | |
likely_text_files = user_stats.get('likely_text_files', 0) | |
text_file_ratio = likely_text_files / total_files if total_files > 0 else 0 | |
compression_potential = 'High' if text_file_ratio > 0.5 else 'Medium' if text_file_ratio > 0.2 else 'Low' | |
# --- Determine Backup Strategy --- | |
strategy = self._determine_backup_strategy( | |
total_size=total_size, | |
critical_size=critical_size, | |
high_size=high_size, | |
recent_change_rate=recent_change_rate, | |
compression_potential=compression_potential, | |
deduplication_potential=dedup_potential | |
) | |
# --- Assemble Recommendations --- | |
recommendations[user] = { | |
'profile': { | |
'total_files': total_files, | |
'total_size': total_size, | |
'critical_data_size': critical_size, | |
'high_importance_data_size': high_size, | |
'medium_importance_data_size': medium_size, | |
'low_importance_data_size': low_size, | |
'recent_changes_count': recent_changes_count, | |
'recent_change_rate': recent_change_rate, | |
'likely_text_file_ratio': text_file_ratio, | |
'duplicate_sample_ratio': duplicate_ratio, | |
}, | |
'key_areas': { | |
'important_dirs': top_important_dirs, | |
'significant_size_by_ext': significant_size_by_ext.to_dict(), | |
'top_value_extensions': top_value_extensions, | |
}, | |
'strategy': strategy, | |
'scan_errors': self.scan_errors.get(user, []) # Include errors specific to this user | |
} | |
return recommendations | |
def _determine_backup_strategy( | |
self, total_size: int, critical_size: int, high_size: int, | |
recent_change_rate: float, compression_potential: str, | |
deduplication_potential: str | |
) -> Dict[str, Any]: | |
"""Determines backup frequency, retention, and technical suggestions.""" | |
total_gb = total_size / (1024**3) | |
critical_gb = critical_size / (1024**3) | |
high_gb = high_size / (1024**3) | |
essential_gb = critical_gb + high_gb | |
# --- Frequency --- | |
# Full backup frequency based on total size | |
if total_gb > 500: full_freq = "Monthly" | |
elif total_gb > 100: full_freq = "Bi-Weekly" | |
else: full_freq = "Weekly" | |
# Incremental/Differential frequency based on change rate and essential size | |
if recent_change_rate > 0.1 or essential_gb > 50: # High change or lots of important data | |
incr_freq = "Daily" | |
elif recent_change_rate > 0.02 or essential_gb > 10: | |
incr_freq = "Every 2-3 Days" | |
else: | |
incr_freq = "Weekly" | |
# Critical data backup frequency (more frequent for most vital) | |
if critical_gb > 10 or recent_change_rate > 0.15: | |
crit_freq = "Every 4-6 Hours" | |
elif critical_gb > 1 or recent_change_rate > 0.05: | |
crit_freq = "Daily" | |
else: | |
crit_freq = "Daily" # Minimum daily for critical | |
# --- Retention --- | |
# Longer retention for smaller datasets, shorter for very large ones | |
if total_gb > 1000: # > 1TB | |
retention_full = "1 Month" | |
retention_incr = "1 Week" | |
retention_crit = "2 Weeks" | |
elif total_gb > 200: # > 200GB | |
retention_full = "2 Months" | |
retention_incr = "2 Weeks" | |
retention_crit = "1 Month" | |
else: | |
retention_full = "3-6 Months" | |
retention_incr = "1 Month" | |
retention_crit = "2 Months" | |
# --- Technical --- | |
compression_rec = compression_potential in ['High', 'Medium'] or total_gb > 50 | |
deduplication_rec = deduplication_potential in ['High', 'Medium'] or total_gb > 100 | |
# Recommend encryption if significant critical/high importance data exists | |
encryption_rec = essential_gb > 0.5 # Recommend if > 500MB essential data | |
return { | |
'full_backup_frequency': full_freq, | |
'incremental_frequency': incr_freq, | |
'critical_data_backup_frequency': crit_freq, | |
'estimated_total_backup_size_gb': round(total_gb, 2), | |
'estimated_essential_backup_size_gb': round(essential_gb, 2), | |
'recommended_retention': { | |
'full': retention_full, | |
'incremental': retention_incr, | |
'critical': retention_crit, | |
}, | |
'compression_recommended': compression_rec, | |
'deduplication_benefit': deduplication_potential, # Keep original potential rating | |
'deduplication_recommended': deduplication_rec, | |
'encryption_recommended': encryption_rec, | |
} | |
# --- UI and Reporting Functions --- | |
def display_summary_panel(analyzer: FileSignificanceAnalyzer) -> Panel: | |
"""Creates a Rich Panel summarizing the overall scan results.""" | |
num_users = len(analyzer.user_stats) | |
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values()) | |
summary_text = Text.assemble( | |
("Analyzed: ", "bold"), (f"{analyzer.total_files_scanned:,}", "cyan"), " files (", | |
(f"{format_size(analyzer.total_size_scanned)}", "cyan"), ") across ", | |
(f"{num_users}", "cyan"), " users.\n", | |
("Scan Depth: ", "bold"), (f"{analyzer.scan_depth}", "cyan"), " levels.\n", | |
("Errors Encountered: ", "bold"), | |
(f"{total_errors:,}", "red" if total_errors > 0 else "green") | |
) | |
return Panel( | |
summary_text, | |
title="[bold blue]Scan Summary[/bold blue]", | |
box=box.DOUBLE, | |
border_style="blue", | |
padding=(1, 2) | |
) | |
def display_user_statistics_table(analyzer: FileSignificanceAnalyzer) -> Table: | |
"""Creates a Rich Table summarizing statistics per user.""" | |
table = Table(title="[bold]User Data Overview[/bold]", box=box.ROUNDED, show_header=True, header_style="bold magenta") | |
table.add_column("User", style="cyan", min_width=12) | |
table.add_column("Total Files", justify="right") | |
table.add_column("Total Size", justify="right") | |
table.add_column("Crit ≥0.8", justify="right", style="red") # Critical files | |
table.add_column("High ≥0.6", justify="right", style="orange3") # High importance | |
table.add_column("Recent Files", justify="right") | |
table.add_column("Unique Samples", justify="right") | |
table.add_column("Errors", justify="right") | |
sorted_users = sorted(analyzer.user_stats.items(), key=lambda item: item[1].get('total_size', 0), reverse=True) | |
for user, stats in sorted_users: | |
errors_count = len(analyzer.scan_errors.get(user, [])) | |
table.add_row( | |
user, | |
f"{stats.get('total_files', 0):,}", | |
format_size(stats.get('total_size', 0)), | |
f"{stats.get('critical_files', 0):,}", | |
f"{stats.get('high_importance_files', 0):,}", | |
f"{stats.get('modified_recently', 0):,}", | |
f"{stats.get('unique_content_samples', 0):,}", | |
f"[red]{errors_count:,}[/red]" if errors_count > 0 else "[green]0[/green]" | |
) | |
return table | |
def display_recommendations(recommendations: Dict[str, Any], console: Console) -> None: | |
"""Displays detailed recommendations for each user.""" | |
console.print("\n" + "="*20 + " [bold green]Backup Recommendations[/bold green] " + "="*20 + "\n") | |
if not recommendations: | |
console.print("[yellow]No recommendations generated (likely no data analyzed).[/yellow]") | |
return | |
for user, rec in recommendations.items(): | |
profile = rec['profile'] | |
key_areas = rec['key_areas'] | |
strategy = rec['strategy'] | |
# --- Profile Panel --- | |
profile_table = Table.grid(padding=(0, 2)) | |
profile_table.add_column() | |
profile_table.add_column() | |
profile_table.add_row("[bold]Total Size:[/bold]", f"[cyan]{format_size(profile['total_size'])}[/cyan] ({profile['total_files']:,} files)") | |
profile_table.add_row("[bold red]Critical Data (≥0.8):[/]", f"[cyan]{format_size(profile['critical_data_size'])}[/cyan]") | |
profile_table.add_row("[bold orange3]High Importance (≥0.6):[/]", f"[cyan]{format_size(profile['high_importance_data_size'])}[/cyan]") | |
profile_table.add_row("[bold]Recent Changes:[/bold]", f"{profile['recent_changes_count']:,} files ({profile['recent_change_rate']:.1%})") | |
profile_table.add_row("[bold]Text File Ratio:[/bold]", f"{profile['likely_text_file_ratio']:.1%}") | |
profile_table.add_row("[bold]Duplicate Sample Ratio:[/bold]", f"{profile['duplicate_sample_ratio']:.1%}") | |
profile_panel = Panel(profile_table, title="[gold1]Data Profile[/gold1]", box=box.ROUNDED, border_style="yellow", expand=False) | |
# --- Key Areas Panel --- | |
key_areas_group = [] | |
# Important Dirs Tree | |
if key_areas['important_dirs']: | |
dir_tree = Tree("[bold]Top Important Directories:[/bold]") | |
sorted_dirs = sorted(key_areas['important_dirs'].items(), key=lambda item: item[1], reverse=True) | |
for dir_path, count in sorted_dirs: | |
dir_tree.add(f"[cyan]{dir_path}[/cyan] ({count} files)") | |
key_areas_group.append(dir_tree) | |
# Top Extensions by Size | |
if key_areas['significant_size_by_ext']: | |
ext_size_list = "\n".join([f"- [green]{ext}[/green]: {format_size(size)}" for ext, size in key_areas['significant_size_by_ext'].items()]) | |
key_areas_group.append(Text.from_markup(f"\n[bold]Top Extensions by Size:[/bold]\n{ext_size_list}")) | |
# Top Extensions by Avg Importance | |
if key_areas['top_value_extensions']: | |
ext_value_list = "\n".join([f"- [magenta]{ext}[/magenta]: {value:.2f} avg score" for ext, value in key_areas['top_value_extensions'].items()]) | |
key_areas_group.append(Text.from_markup(f"\n[bold]Highest Avg Importance Extensions:[/bold]\n{ext_value_list}")) | |
key_areas_panel = Panel(Group(*key_areas_group), title="[dark_sea_green4]Key Content Areas[/dark_sea_green4]", box=box.ROUNDED, border_style="green", expand=False) | |
# --- Strategy Panel --- | |
strat = strategy # Alias for brevity | |
strategy_table = Table.grid(padding=(0, 2)) | |
strategy_table.add_column(style="bold") | |
strategy_table.add_column() | |
strategy_table.add_row("Full Backup:", f"[cyan]{strat['full_backup_frequency']}[/cyan]") | |
strategy_table.add_row("Incremental Backup:", f"[cyan]{strat['incremental_frequency']}[/cyan]") | |
strategy_table.add_row("Critical Data Backup:", f"[cyan]{strat['critical_data_backup_frequency']}[/cyan]") | |
strategy_table.add_row("Retention (Full):", f"[cyan]{strat['recommended_retention']['full']}[/cyan]") | |
strategy_table.add_row("Retention (Incr):", f"[cyan]{strat['recommended_retention']['incremental']}[/cyan]") | |
strategy_table.add_row("Retention (Crit):", f"[cyan]{strat['recommended_retention']['critical']}[/cyan]") | |
strategy_table.add_row("Est. Total Size:", f"[cyan]{strat['estimated_total_backup_size_gb']:.2f} GB[/cyan]") | |
strategy_table.add_row("Est. Essential Size:", f"[cyan]{strat['estimated_essential_backup_size_gb']:.2f} GB[/cyan]") | |
strategy_table.add_row("Compression:", "[green]Recommended[/green]" if strat['compression_recommended'] else "[yellow]Optional[/yellow]") | |
strategy_table.add_row("Deduplication:", f"[green]Recommended[/green] (Benefit: {strat['deduplication_benefit']})" if strat['deduplication_recommended'] else f"[yellow]Optional[/yellow] (Benefit: {strat['deduplication_benefit']})") | |
strategy_table.add_row("Encryption:", "[green]Recommended[/green]" if strat['encryption_recommended'] else "[yellow]Optional[/yellow]") | |
strategy_panel = Panel(strategy_table, title="[steel_blue1]Recommended Strategy[/steel_blue1]", box=box.ROUNDED, border_style="blue", expand=False) | |
# --- Combine Panels for User --- | |
user_layout = Layout() | |
user_layout.split_row( | |
Layout(profile_panel, ratio=1), | |
Layout(key_areas_panel, ratio=1), | |
Layout(strategy_panel, ratio=1) | |
) | |
console.print(Panel(user_layout, title=f"[bold blue]User: [cyan]{user}[/cyan]", border_style="blue", expand=False)) | |
console.print() # Spacer | |
def display_top_files_table(df: pd.DataFrame, console: Console, num_files: int = 20) -> None: | |
"""Displays a table of the most important files found.""" | |
if df.empty: | |
return | |
console.print(f"\n--- [bold magenta]Top {num_files} Most Important Files[/bold magenta] ---\n") | |
table = Table(box=box.HEAVY_EDGE, show_header=True, header_style="bold green") | |
table.add_column("Rank", style="dim", width=4) | |
table.add_column("User", style="cyan", min_width=10) | |
table.add_column("Path", style="green", max_width=70) # Limit path width | |
table.add_column("Size", justify="right") | |
table.add_column("Modified", justify="center") | |
table.add_column("Score", justify="right") | |
table.add_column("Factors", style="dim", min_width=20) # Contributing factors | |
top_files = df.head(num_files) | |
for index, row in top_files.iterrows(): | |
# Determine color based on importance score | |
score = row['importance'] | |
if score >= 0.8: score_style = "[bold red]" | |
elif score >= 0.6: score_style = "[orange3]" | |
elif score >= 0.4: score_style = "[yellow]" | |
else: score_style = "[dim]" | |
# Format path nicely | |
path_str = str(row['path']) | |
if len(path_str) > 65: # Truncate long paths | |
path_str = "..." + path_str[-62:] | |
# Summarize key factors contributing to the score | |
factors = [] | |
if row['has_important_keyword']: factors.append("keyword") | |
if row['importance'] > 0.7 and any(part in IMPORTANT_DIR_NAMES for part in row['path'].parts): factors.append("imp_dir") | |
if row['extension'] in EXTENSION_WEIGHTS and EXTENSION_WEIGHTS[row['extension']] > 0.8: factors.append("ext") | |
if row['age_days'] < 30: factors.append("recent") | |
if row['entropy'] > 0.8: factors.append("entropy") | |
factors_str = ", ".join(factors) | |
table.add_row( | |
str(index + 1), | |
row['user'], | |
path_str, | |
format_size(row['size']), | |
row['modified'].strftime("%Y-%m-%d"), | |
f"{score_style}{score:.3f}[/]", | |
factors_str | |
) | |
console.print(table) | |
def display_scan_errors(analyzer: FileSignificanceAnalyzer, console: Console) -> None: | |
"""Displays errors encountered during the scan.""" | |
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values()) | |
if total_errors == 0: | |
return # Don't display if no errors | |
console.print("\n" + "="*20 + " [bold yellow]Scan Errors[/bold yellow] " + "="*20 + "\n") | |
error_tree = Tree(f"[bold yellow]Encountered {total_errors} errors during scan:[/]") | |
max_errors_per_user = 5 # Limit displayed errors per user | |
for user, errors in analyzer.scan_errors.items(): | |
if not errors: | |
continue | |
user_node = error_tree.add(f"[cyan]{user}[/cyan] ({len(errors)} errors)") | |
for i, error_msg in enumerate(errors): | |
if i < max_errors_per_user: | |
user_node.add(f"[dim red]- {error_msg}[/dim red]") | |
elif i == max_errors_per_user: | |
user_node.add(f"[dim]... ({len(errors) - max_errors_per_user} more errors not shown)[/dim]") | |
break | |
console.print(error_tree) | |
def generate_markdown_report( | |
analyzer: FileSignificanceAnalyzer, | |
df: pd.DataFrame, | |
recommendations: Dict[str, Any] | |
) -> str: | |
"""Generates a comprehensive Markdown report of the analysis.""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
scan_path = analyzer.base_path | |
num_users = len(analyzer.user_stats) | |
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values()) | |
# --- Header --- | |
md = f"""# Backup Significance Analysis Report | |
**Generated:** {timestamp} | |
**Scan Path:** `{scan_path}` | |
**Scan Depth:** {analyzer.scan_depth} | |
## 1. Scan Summary | |
* **Files Analyzed:** {analyzer.total_files_scanned:,} | |
* **Total Data Size:** {format_size(analyzer.total_size_scanned)} | |
* **Users Scanned:** {num_users} | |
* **Scan Errors:** {total_errors:,} | |
## 2. User Data Overview | |
| User | Total Files | Total Size | Critical Files (≥0.8) | High Importance (≥0.6) | Recent Files (<30d) | Unique Samples | Scan Errors | | |
|------|-------------|------------|-----------------------|------------------------|---------------------|----------------|-------------| | |
""" | |
# --- User Stats Table --- | |
sorted_users = sorted(analyzer.user_stats.items(), key=lambda item: item[1].get('total_size', 0), reverse=True) | |
for user, stats in sorted_users: | |
errors_count = len(analyzer.scan_errors.get(user, [])) | |
md += f"| {user} | {stats.get('total_files', 0):,} | {format_size(stats.get('total_size', 0))} | {stats.get('critical_files', 0):,} | {stats.get('high_importance_files', 0):,} | {stats.get('modified_recently', 0):,} | {stats.get('unique_content_samples', 0):,} | {errors_count:,} |\n" | |
# --- Recommendations --- | |
md += "\n## 3. Backup Recommendations per User\n" | |
if not recommendations: | |
md += "\n_No recommendations generated._\n" | |
else: | |
for user, rec in recommendations.items(): | |
profile = rec['profile'] | |
key_areas = rec['key_areas'] | |
strategy = rec['strategy'] | |
md += f"\n### User: `{user}`\n\n" | |
# Profile | |
md += "**Data Profile:**\n" | |
md += f"- Total Size: **{format_size(profile['total_size'])}** ({profile['total_files']:,} files)\n" | |
md += f"- Critical Data (Score ≥0.8): **{format_size(profile['critical_data_size'])}**\n" | |
md += f"- High Importance Data (Score ≥0.6): **{format_size(profile['high_importance_data_size'])}**\n" | |
md += f"- Recent Changes (<30d): **{profile['recent_changes_count']:,} files** ({profile['recent_change_rate']:.1%})\n" | |
md += f"- Text File Ratio: **{profile['likely_text_file_ratio']:.1%}** (Affects Compression)\n" | |
md += f"- Duplicate Sample Ratio: **{profile['duplicate_sample_ratio']:.1%}** (Affects Deduplication)\n\n" | |
# Key Areas | |
md += "**Key Content Areas:**\n" | |
if key_areas['important_dirs']: | |
md += "- _Top Important Directories:_\n" | |
sorted_dirs = sorted(key_areas['important_dirs'].items(), key=lambda item: item[1], reverse=True) | |
for dir_path, count in sorted_dirs: | |
md += f" - `{dir_path}` ({count} files)\n" | |
if key_areas['significant_size_by_ext']: | |
md += "- _Top Extensions by Size:_\n" | |
for ext, size in key_areas['significant_size_by_ext'].items(): | |
md += f" - `{ext if ext else '<none>'}`: {format_size(size)}\n" | |
if key_areas['top_value_extensions']: | |
md += "- _Highest Avg Importance Extensions:_\n" | |
for ext, value in key_areas['top_value_extensions'].items(): | |
md += f" - `{ext if ext else '<none>'}`: {value:.2f} avg score\n" | |
md += "\n" | |
# Strategy | |
strat = strategy # Alias | |
md += "**Recommended Strategy:**\n" | |
md += f"- Full Backup: **{strat['full_backup_frequency']}**\n" | |
md += f"- Incremental Backup: **{strat['incremental_frequency']}**\n" | |
md += f"- Critical Data Backup: **{strat['critical_data_backup_frequency']}**\n" | |
md += "- Retention:\n" | |
md += f" - Full: **{strat['recommended_retention']['full']}**\n" | |
md += f" - Incremental: **{strat['recommended_retention']['incremental']}**\n" | |
md += f" - Critical: **{strat['recommended_retention']['critical']}**\n" | |
md += f"- Estimated Total Size: **{strat['estimated_total_backup_size_gb']:.2f} GB**\n" | |
md += f"- Estimated Essential Size (Crit+High): **{strat['estimated_essential_backup_size_gb']:.2f} GB**\n" | |
md += f"- Compression: **{'Recommended' if strat['compression_recommended'] else 'Optional'}**\n" | |
md += f"- Deduplication: **{'Recommended' if strat['deduplication_recommended'] else 'Optional'}** (Benefit: {strat['deduplication_benefit']})\n" | |
md += f"- Encryption: **{'Recommended' if strat['encryption_recommended'] else 'Optional'}**\n\n" | |
# --- Top Files --- | |
md += "\n## 4. Top Important Files\n\n" | |
if df.empty: | |
md += "_No file data available._\n" | |
else: | |
md += "| Rank | User | Path | Size | Modified | Score | Factors |\n" | |
md += "|------|------|------|------|----------|-------|---------|\n" | |
top_files = df.head(20) # Include top 20 in report | |
for index, row in top_files.iterrows(): | |
path_str = str(row['path']) | |
# Basic escaping for Markdown table | |
path_str_md = path_str.replace('|', '\\|') | |
if len(path_str_md) > 65: path_str_md = "..." + path_str_md[-62:] | |
score = row['importance'] | |
factors = [] | |
if row['has_important_keyword']: factors.append("keyword") | |
if score > 0.7 and any(part in IMPORTANT_DIR_NAMES for part in row['path'].parts): factors.append("imp_dir") | |
if row['extension'] in EXTENSION_WEIGHTS and EXTENSION_WEIGHTS[row['extension']] > 0.8: factors.append("ext") | |
if row['age_days'] < 30: factors.append("recent") | |
if row['entropy'] > 0.8: factors.append("entropy") | |
factors_str = ", ".join(factors) if factors else "-" | |
md += f"| {index + 1} | {row['user']} | `{path_str_md}` | {format_size(row['size'])} | {row['modified'].strftime('%Y-%m-%d')} | {score:.3f} | {factors_str} |\n" | |
# --- Scan Errors --- | |
if total_errors > 0: | |
md += "\n## 5. Scan Errors Encountered\n\n" | |
md += "_Note: Only the first few errors per user are listed below._\n\n" | |
max_errors_per_user_report = 10 | |
for user, errors in analyzer.scan_errors.items(): | |
if not errors: continue | |
md += f"### User: `{user}` ({len(errors)} errors)\n\n" | |
for i, error_msg in enumerate(errors): | |
if i < max_errors_per_user_report: | |
# Basic code formatting for the error message | |
md += f"- `{error_msg}`\n" | |
elif i == max_errors_per_user_report: | |
md += f"- ... ({len(errors) - max_errors_per_user_report} more errors not shown)\n" | |
break | |
md += "\n" | |
# --- Footer --- | |
md += "\n---\n" | |
md += f"*Report generated by Backup Significance Analyzer v2.1.0 on {timestamp}*" | |
return md | |
# --- Main Execution --- | |
def main(): | |
"""Main execution function.""" | |
console = Console() # Main console for output | |
console.print(Panel.fit( | |
Text("Backup Significance Analyzer v2.1.0", style="bold blue", justify="center") + | |
Text("\nStatistical analysis for optimized backup strategies", style="cyan", justify="center"), | |
box=box.DOUBLE, | |
border_style="blue" | |
)) | |
console.print() | |
# Check for root/sudo privileges | |
is_sudo = os.geteuid() == 0 | |
if not is_sudo: | |
console.print("[bold yellow]Warning:[/bold yellow] This script likely needs root/sudo permissions " | |
"to read all user directories in '/home'.") | |
console.print("[yellow]Analysis may be incomplete due to permission errors.[/yellow]") | |
if not Confirm.ask("Continue anyway?", default=False): | |
sys.exit(0) | |
else: | |
console.print("[green]Running with elevated (sudo) permissions.[/green]") | |
console.print() | |
# Determine target path | |
target_path = "/home" | |
if len(sys.argv) > 1: | |
target_path_arg = sys.argv[1] | |
if Path(target_path_arg).is_dir(): | |
target_path = target_path_arg | |
else: | |
console.print(f"[yellow]Warning:[/yellow] Provided path '{target_path_arg}' is not a valid directory. " | |
f"Defaulting to '{target_path}'.") | |
# --- Initialize and Run Scan --- | |
analyzer = FileSignificanceAnalyzer(base_path=target_path, scan_depth=10) # Increased depth | |
try: | |
analyzer.scan_home_directory() | |
except Exception as e: | |
console.print(f"\n[bold red]Fatal error during directory scan:[/bold red] {e}") | |
import traceback | |
console.print("[dim]" + traceback.format_exc() + "[/dim]") | |
sys.exit(1) | |
if analyzer.total_files_scanned == 0 and not any(analyzer.scan_errors.values()): | |
console.print("[yellow]Scan complete, but no files were found or processed.[/yellow]") | |
console.print("[yellow]Check the target path and permissions.[/yellow]") | |
display_scan_errors(analyzer, console) # Show errors if any | |
return # Exit if nothing was scanned | |
# --- Analyze Data --- | |
console.print("\n[bold green]Analyzing collected data...[/bold green]") | |
# Use a status indicator for potentially long analysis | |
with console.status("[bold cyan]Performing significance analysis...", spinner="dots"): | |
df = analyzer.analyze_data() | |
if df.empty and analyzer.total_files_scanned > 0: | |
console.print("[bold red]Error:[/bold red] Data analysis failed. Check previous error messages.") | |
display_scan_errors(analyzer, console) | |
return | |
recommendations = analyzer.get_backup_recommendations(df) | |
console.print("[bold green]Analysis finished.[/bold green]") | |
# --- Display Results --- | |
console.print("\n" + "="*60) | |
console.print(display_summary_panel(analyzer)) | |
console.print(display_user_statistics_table(analyzer)) | |
display_top_files_table(df, console) | |
display_recommendations(recommendations, console) | |
display_scan_errors(analyzer, console) # Display errors at the end | |
# --- Save Report --- | |
console.print("\n" + "="*60) | |
if Confirm.ask("\n[bold]Save analysis report to a file?[/bold]", default=True): | |
report_format = Prompt.ask( | |
"Select report format", | |
choices=["markdown", "text"], | |
default="markdown" | |
).lower() | |
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename_base = f"backup_analysis_{timestamp_str}" | |
if report_format == "markdown": | |
filename = f"{filename_base}.md" | |
console.print(f"Generating Markdown report...") | |
markdown_content = generate_markdown_report(analyzer, df, recommendations) | |
try: | |
with open(filename, "w", encoding='utf-8') as f: | |
f.write(markdown_content) | |
console.print(f"[bold green]✔ Markdown report saved to:[/bold green] [cyan]{filename}[/cyan]") | |
except IOError as e: | |
console.print(f"[bold red]Error saving Markdown report:[/bold red] {e}") | |
else: # Text format | |
filename = f"{filename_base}.txt" | |
console.print(f"Generating Text report...") | |
try: | |
# Capture the console output for the text report | |
text_console = Console(record=True, width=120) # Use fixed width for text file | |
text_console.print(display_summary_panel(analyzer)) | |
text_console.print(display_user_statistics_table(analyzer)) | |
display_top_files_table(df, text_console) | |
display_recommendations(recommendations, text_console) | |
display_scan_errors(analyzer, text_console) | |
report_content = text_console.export_text() | |
with open(filename, "w", encoding='utf-8') as f: | |
f.write(f"Backup Significance Analysis Report - {timestamp}\n") | |
f.write("="*80 + "\n") | |
f.write(report_content) | |
console.print(f"[bold green]✔ Text report saved to:[/bold green] [cyan]{filename}[/cyan]") | |
except IOError as e: | |
console.print(f"[bold red]Error saving Text report:[/bold red] {e}") | |
except Exception as e: | |
console.print(f"[bold red]Unexpected error generating text report:[/bold red] {e}") | |
console.print("\n[bold blue]Analysis complete. Exiting.[/bold blue]") | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("\n\n[yellow]Analysis interrupted by user. Exiting gracefully.[/yellow]") | |
sys.exit(0) | |
except Exception as e: | |
# Fallback for unexpected errors in main execution flow | |
console = Console() | |
console.print(f"\n[bold red]An unexpected critical error occurred:[/bold red]") | |
console.print_exception(show_locals=False) # Show traceback | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment