Skip to content

Instantly share code, notes, and snippets.

@EricsonWillians
Last active April 28, 2025 18:21
Show Gist options
  • Save EricsonWillians/5a5d78268531c413bcf9cad5b9a4d272 to your computer and use it in GitHub Desktop.
Save EricsonWillians/5a5d78268531c413bcf9cad5b9a4d272 to your computer and use it in GitHub Desktop.
Python backup analyzer. Scores file significance in home dirs (stats/heuristics: type, age, keywords, entropy). Generates backup strategies & MD reports to optimize storage/recovery. Req: Python 3, rich, pandas, numpy, sudo.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Backup Significance Analyzer v2.1.0
Analyzes file and directory significance in user home directories using statistical
methods (entropy, access patterns, modification frequency, content hints) to
recommend optimized backup strategies.
Requires: rich, pandas, numpy
Usage:
sudo python3 backup_significance_analyzer.py [/path/to/scan]
Example:
sudo python3 backup_significance_analyzer.py /home
Author: Backup Intelligence Team (Enhanced by AI)
Version: 2.1.0
"""
import os
import sys
import time
import math
import hashlib
import json
import subprocess
from datetime import datetime
from pathlib import Path
from collections import Counter, defaultdict
from typing import Dict, List, Tuple, Set, Optional, Union, Any, NamedTuple
# --- Dependency Handling & Environment Setup ---
# Attempt to limit OpenBLAS threads before importing numpy/pandas
# This helps prevent 'pthread_create failed' errors on some systems when run with sudo
try:
num_threads = "2" # Start with a low number
os.environ['OPENBLAS_NUM_THREADS'] = num_threads
os.environ['MKL_NUM_THREADS'] = num_threads
os.environ['OMP_NUM_THREADS'] = num_threads
# Can add more environment variables if needed (e.g., for Accelerate framework on macOS)
print(f"[INFO] Setting max BLAS threads to {num_threads} to mitigate potential resource issues.")
except Exception as e:
print(f"[Warning] Could not set BLAS thread limits: {e}")
# Check and attempt to install rich
try:
from rich.console import Console, Group
from rich.panel import Panel
from rich.progress import (Progress, BarColumn, TextColumn,
TimeElapsedColumn, SpinnerColumn, TaskID)
from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from rich import box
from rich.prompt import Confirm, Prompt
from rich.layout import Layout
from rich.markdown import Markdown
from rich.syntax import Syntax
except ImportError:
print("Required package 'rich' not found. Attempting to install...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "rich"])
print("Installation complete. Please restart the script.")
sys.exit(0)
except Exception as e:
print(f"Failed to install 'rich': {e}")
print("Please install it manually: pip install rich")
sys.exit(1)
# Check for pandas and numpy (often problematic with sudo)
try:
import pandas as pd
except ImportError:
print("Required package 'pandas' not found.")
print("Attempting to install 'pandas'...")
try:
# Note: Using sudo pip can be risky. Consider system package manager or virtual envs.
print("[Warning] Trying 'pip install pandas'. If this fails or you used 'sudo python3', "
"you might need 'sudo pip install pandas' or install via your system package manager.")
subprocess.check_call([sys.executable, "-m", "pip", "install", "pandas"])
print("Installation complete. Please restart the script.")
sys.exit(0)
except Exception as e:
print(f"Failed to install 'pandas': {e}")
print("Please install it manually (e.g., 'pip install pandas' or 'sudo pip install pandas').")
sys.exit(1)
try:
import numpy as np
except ImportError:
print("Required package 'numpy' not found.")
print("Attempting to install 'numpy'...")
try:
print("[Warning] Trying 'pip install numpy'. If this fails or you used 'sudo python3', "
"you might need 'sudo pip install numpy' or install via your system package manager.")
subprocess.check_call([sys.executable, "-m", "pip", "install", "numpy"])
print("Installation complete. Please restart the script.")
sys.exit(0)
except Exception as e:
print(f"Failed to install 'numpy': {e}")
print("Please install it manually (e.g., 'pip install numpy' or 'sudo pip install numpy').")
sys.exit(1)
# --- Configuration Constants ---
# Weights for file extensions (0.0 to 1.0) - higher means more important
EXTENSION_WEIGHTS: Dict[str, float] = {
# Documents & Text
'pdf': 0.85, 'doc': 0.83, 'docx': 0.85, 'txt': 0.70, 'odt': 0.82,
'rtf': 0.75, 'tex': 0.80, 'md': 0.78, 'log': 0.40, 'csv': 0.80,
'json': 0.78, 'xml': 0.76, 'yaml': 0.77, 'yml': 0.77, 'toml': 0.77,
'ini': 0.75, 'conf': 0.78, 'config': 0.78, 'env': 0.80,
# Spreadsheets & Presentations
'xlsx': 0.85, 'xls': 0.84, 'ods': 0.83,
'pptx': 0.83, 'ppt': 0.82, 'odp': 0.81,
# Code & Development
'py': 0.88, 'ipynb': 0.87, 'java': 0.85, 'c': 0.85, 'cpp': 0.85, 'h': 0.80,
'js': 0.83, 'html': 0.75, 'css': 0.72, 'sh': 0.82, 'bash': 0.82, 'zsh': 0.80,
'rb': 0.83, 'php': 0.83, 'go': 0.85, 'rs': 0.85, 'ts': 0.84, 'sql': 0.84,
'swift': 0.85, 'kt': 0.85, 'dart': 0.84, 'lua': 0.82, 'r': 0.83,
'pl': 0.82, 'scala': 0.83, 'vb': 0.80, 'ps1': 0.82, 'psm1': 0.82,
'dockerfile': 0.75, 'makefile': 0.70,
# Databases & Data
'db': 0.90, 'sqlite': 0.90, 'sqlite3': 0.90, 'mdb': 0.88, 'accdb': 0.88,
'sqlitedb': 0.90, 'bak': 0.80, 'dump': 0.80,
# Media (Lower importance generally, can be large)
'jpg': 0.67, 'jpeg': 0.67, 'png': 0.68, 'gif': 0.55, 'svg': 0.70, 'ico': 0.40,
'tiff': 0.72, 'bmp': 0.60, 'psd': 0.75, 'ai': 0.75, 'eps': 0.72,
'raw': 0.78, 'cr2': 0.78, 'nef': 0.78, 'arw': 0.78, 'dng': 0.78, # Camera Raw
'mp3': 0.60, 'wav': 0.65, 'flac': 0.72, 'aac': 0.62, 'ogg': 0.63, 'm4a': 0.61,
'mp4': 0.75, 'mov': 0.75, 'avi': 0.73, 'mkv': 0.73, 'wmv': 0.70, 'flv': 0.68,
'webm': 0.70, 'webp': 0.65,
# Archives (Importance depends on content, often medium)
'zip': 0.70, 'tar': 0.70, 'gz': 0.70, 'bz2': 0.70, 'xz': 0.70,
'rar': 0.70, '7z': 0.70, 'tgz': 0.70, 'iso': 0.50, 'img': 0.60,
# Cryptographic & Security (High Importance)
'pem': 0.95, 'key': 0.95, 'crt': 0.95, 'csr': 0.90, 'p12': 0.95, 'pfx': 0.95,
'gpg': 0.95, 'pgp': 0.95, 'kdbx': 0.95, 'keystore': 0.95, 'jks': 0.95,
'ssh': 0.90, # Often config files in .ssh/
# System & Temporary (Low Importance)
'tmp': 0.10, 'temp': 0.10, 'swp': 0.05, 'swo': 0.05, 'lock': 0.15,
'pid': 0.10, 'cache': 0.05, 'o': 0.20, 'obj': 0.20, 'a': 0.25, 'lib': 0.30,
'so': 0.30, 'dll': 0.30, 'dylib': 0.30, 'exe': 0.40, 'app': 0.45, 'msi': 0.35,
'sys': 0.25, 'drv': 0.25,
# Virtual Machines & Containers (Can be large, importance varies)
'vmdk': 0.65, 'vdi': 0.65, 'qcow2': 0.65, 'vhd': 0.65, 'vhdx': 0.65,
'ova': 0.70, 'ovf': 0.70,
# Misc Office/System
'xkb': 0.30, 'desktop': 0.25, 'lnk': 0.20,
'ics': 0.75, 'vcf': 0.80,
'otf': 0.50, 'ttf': 0.50, # Fonts
}
# Directory names hinting at important user data
IMPORTANT_DIR_NAMES: Set[str] = {
'Documents', 'Documents', 'My Documents', 'Docs', 'Texte', 'Dokumente', # Common variations
'Pictures', 'Photos', 'Images', 'Bilder',
'Music', 'Audio', 'Musik',
'Videos', 'Movies', 'Filme',
'Desktop', 'Schreibtisch',
'Downloads', # Often contains important recent items, despite temp nature
'Projects', 'src', 'code', 'dev', 'workspace', 'repos',
'work', 'research', 'studies', 'uni', 'thesis', 'dissertation',
'backup', 'archive', 'important', 'critical', 'private', 'personal',
'.ssh', '.gnupg', '.aws', '.kube', '.docker', '.config', # Config directories
'tax', 'finance', 'banking', 'invoices', 'receipts',
'cv', 'resume', 'applications', 'contracts', 'legal', 'official',
'medical', 'health',
'credentials', 'certificates', 'passwords', 'keys', 'secrets', 'identity',
'notes', 'notebooks', 'journals',
'vm', 'virtualbox', 'virtual machines',
'git' # User's main git repos, not .git inside projects
}
# Directory names/patterns typically containing low-value/regeneratable data
# Using path components for matching
LOW_VALUE_DIR_PATTERNS: Set[str] = {
'node_modules', '__pycache__', '.git', # Specific common ones
'venv', 'env', '.venv', '.env', # Virtual environments
'tmp', 'temp', 'cache', '.cache', '.pytest_cache', # Caches and temp
'logs', 'log', '.logs', # Logs
'build', 'dist', 'out', 'target', 'bin', 'obj', # Build artifacts
'.Trash', '.local/share/Trash', # Trash bins
'.npm', '.yarn', '.pnpm-store', '.gradle', '.m2', '.nuget', # Package manager caches
'.cargo', '.rustup', '.pub-cache', '.composer', # More package caches
'bower_components', '.history', '.vscode', '.idea', '.settings', # IDE/Editor specific
'vendor', 'Pods', # Dependency dirs
'site-packages', 'jspm_packages', # Language package dirs
'coverage', '.coverage' # Test coverage reports
}
# Keywords in filenames hinting at important content
IMPORTANT_KEYWORDS: Set[str] = {
'password', 'secret', 'key', 'credential', 'token', 'api_key', 'private_key',
'id', 'identity', 'certificate', 'license', 'contract', 'agreement',
'tax', 'ssn', 'passport', 'driver_license', 'personal_id',
'confidential', 'private', 'sensitive', 'classified',
'personal', 'financial', 'banking', 'loan', 'mortgage', 'investment',
'medical', 'health', 'diagnosis', 'prescription', 'insurance',
'legal', 'official', 'affidavit', 'subpoena', 'testament', 'will',
'exam', 'thesis', 'dissertation', 'proposal', 'manuscript', 'publication',
'report', 'analysis', 'strategy', 'plan', 'research',
'invoice', 'receipt', 'bill', 'statement', 'payslip',
'backup', 'archive', 'export', # Could be important exports
'resume', 'cv', 'cover_letter', 'application'
}
# --- Helper Classes and Functions ---
class FileMetadata(NamedTuple):
"""Stores metadata for a single file."""
user: str
path: Path # Relative path within user's home
absolute_path: Path # Full path
size: int
modified: datetime
accessed: datetime
extension: str
age_days: float
age_category: str
entropy: float
is_unique: bool # Based on sampled hash
importance: float
hash_sample: Optional[str] # Hash of sampled content
has_important_keyword: bool
is_likely_text: bool
def format_size(size_bytes: Union[int, float]) -> str:
"""Formats size in bytes to human-readable format (KB, MB, GB)."""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024**2:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024**3:
return f"{size_bytes / 1024**2:.1f} MB"
elif size_bytes < 1024**4:
return f"{size_bytes / 1024**3:.2f} GB"
else:
return f"{size_bytes / 1024**4:.2f} TB"
def calculate_entropy(data: bytes) -> float:
"""Calculates Shannon entropy of byte data, normalized to 0.0-1.0."""
if not data:
return 0.0
entropy = 0.0
data_len = len(data)
byte_counts = Counter(data)
for count in byte_counts.values():
p_x = count / data_len
if p_x > 0:
entropy -= p_x * math.log2(p_x)
# Normalize: max entropy for bytes is log2(256) = 8
return entropy / 8.0
def is_likely_text_file(sample: bytes, ext: str) -> bool:
"""Simple heuristic to guess if a file is text-based."""
text_extensions = {
'txt', 'md', 'py', 'js', 'json', 'html', 'css', 'xml', 'yaml', 'yml',
'c', 'cpp', 'h', 'java', 'rb', 'php', 'pl', 'sh', 'sql', 'csv', 'log',
'tex', 'rtf', 'ini', 'conf', 'config', 'toml', 'go', 'rs', 'ts', 'swift',
'kt', 'dart', 'lua', 'r', 'ps1', 'psm1', 'bash', 'zsh', 'ipynb'
}
if ext in text_extensions:
return True
# Check content: If mostly printable ASCII and common UTF-8, likely text.
# Avoid checking binary files based on extension alone.
binary_extensions = {'exe', 'dll', 'so', 'a', 'o', 'lib', 'bin', 'img', 'iso'}
if ext in binary_extensions:
return False
try:
# Attempt to decode a sample as UTF-8
sample.decode('utf-8', errors='strict')
# High proportion of non-printable chars might indicate binary
non_printable = sum(1 for byte in sample if byte < 32 and byte not in (9, 10, 13)) # Allow tab, lf, cr
if non_printable / len(sample) > 0.1: # Arbitrary threshold: >10% non-printable?
return False
return True
except UnicodeDecodeError:
return False
except Exception: # Other potential issues
return False
# --- Core Analyzer Class ---
class FileSignificanceAnalyzer:
"""Core engine for scanning files and calculating backup significance."""
# Sampling config for hashing and entropy
HASH_SAMPLE_SIZE_START = 65536 # 64 KB from start
HASH_SAMPLE_SIZE_END = 65536 # 64 KB from end (if file is large enough)
MAX_SIZE_FOR_FULL_HASH = 1 * 1024 * 1024 # 1 MB
MAX_SIZE_FOR_ENTROPY_CALC = 10 * 1024 * 1024 # 10 MB
def __init__(self, base_path: str = "/home", scan_depth: int = 10, entropy_threshold: float = 0.65):
"""
Initializes the analyzer.
Args:
base_path: The base directory to scan (e.g., "/home").
scan_depth: Maximum directory depth to traverse relative to each user's home.
entropy_threshold: Minimum normalized entropy score to be considered 'interesting'.
"""
self.base_path = Path(base_path).resolve() # Ensure absolute path
self.scan_depth = scan_depth
self.entropy_threshold = entropy_threshold
self.file_metadata_list: List[FileMetadata] = []
self.user_stats: Dict[str, Dict[str, Any]] = {}
self.scan_errors: Dict[str, List[str]] = defaultdict(list)
self.total_size_scanned: int = 0
self.total_files_scanned: int = 0
self.content_hashes: Set[str] = set() # Stores hashes of file content samples
self.console = Console(stderr=True) # Use stderr for progress/errors
def _log_error(self, user: str, path: Union[Path, str], error: Exception):
"""Logs an error encountered during scanning."""
error_msg = f"Error processing '{path}': {type(error).__name__}: {error}"
self.scan_errors[user].append(error_msg)
# Optionally print verbose errors immediately
# self.console.print(f"[dim red] স্ক্যান ত্রুটি ({user}): {error_msg}[/dim red]")
def _calculate_importance(
self,
file_info: FileMetadata
) -> float:
"""
Calculates a file's importance score (0.0 to 1.0) based on multiple factors.
Factors considered:
- File extension weight
- Directory path importance (is it in IMPORTANT_DIR_NAMES?)
- File age (recency)
- File size (moderate boost for non-tiny files)
- Content entropy (higher entropy can indicate unique data like archives, crypto)
- Presence of keywords in filename
Args:
file_info: The FileMetadata object for the file.
Returns:
A float score between 0.0 and 1.0.
"""
ext = file_info.extension
path = file_info.path # Relative path
stats_size = file_info.size
age_days = file_info.age_days
entropy = file_info.entropy
has_keyword = file_info.has_important_keyword
# 1. Base importance from file extension
# Default to 0.4 if extension unknown, 0.1 if clearly temp/cache ext
ext_importance = EXTENSION_WEIGHTS.get(ext, 0.4)
if ext in {'tmp', 'temp', 'cache', 'swp', 'swo', 'lock', 'pid', 'o', 'obj'}:
ext_importance = 0.1
# 2. Directory path importance
dir_importance = 0.5 # Default baseline
try:
path_parts = set(p.lower() for p in path.parts) # Lowercase for case-insensitive match
# Check against lowercase versions of important/low-value names
important_dir_names_lower = {d.lower() for d in IMPORTANT_DIR_NAMES}
low_value_patterns_lower = {p.lower() for p in LOW_VALUE_DIR_PATTERNS}
# Use path components for matching low value patterns
is_in_low_value_dir = any(part in low_value_patterns_lower for part in path.parts)
if is_in_low_value_dir:
dir_importance = 0.1 # Significantly reduce importance
elif any(part in important_dir_names_lower for part in path_parts):
dir_importance = 0.9 # Significantly increase importance
# Check parent directories too for importance context
elif len(path.parts) > 1 and path.parts[-2].lower() in important_dir_names_lower:
dir_importance = 0.8 # Slightly less boost than direct match
except Exception:
pass # Ignore errors during path analysis
# 3. Recency factor (higher score for more recent files)
# Max score 1.0 for < 7 days, decays over 2 years
if age_days < 7:
recency_factor = 1.0
elif age_days < 730: # Up to 2 years
recency_factor = max(0.0, 1.0 - (age_days / 730.0))
else:
recency_factor = 0.05 # Very small boost for very old files
# 4. Size factor (penalize zero-byte, small boost for >1KB, cap boost)
if stats_size == 0:
size_factor = 0.0
elif stats_size < 1024:
size_factor = 0.2 # Small files get a small base score
else:
# Logarithmic scale, capping contribution. Max boost around 10MB.
size_mb = stats_size / (1024 * 1024)
size_factor = min(0.8, 0.3 + 0.5 * math.log10(1.0 + size_mb))
# 5. Entropy factor (boost if entropy is high, suggesting non-standard/compressed data)
if entropy > self.entropy_threshold:
# Scale boost based on how much entropy exceeds threshold
entropy_factor = 0.5 + 0.5 * ((entropy - self.entropy_threshold) / (1.0 - self.entropy_threshold))
else:
entropy_factor = 0.4 # Baseline if entropy isn't high
# 6. Keyword factor (significant boost if filename contains keywords)
keyword_factor = 0.95 if has_keyword else 0.5
# Combine factors with weights (adjust weights as needed)
# Weights should sum close to 1.0 if normalized, but here represent relative contribution.
# Prioritizing: Directory > Extension > Keyword > Recency > Entropy > Size
weighted_importance = (
0.30 * dir_importance +
0.25 * ext_importance +
0.15 * keyword_factor +
0.15 * recency_factor +
0.10 * entropy_factor +
0.05 * size_factor
)
# Ensure score is within [0.0, 1.0]
final_importance = max(0.0, min(1.0, weighted_importance))
# Special overrides: if in low value dir, cap max importance unless keyword found
if dir_importance <= 0.1 and not has_keyword:
final_importance = min(final_importance, 0.15)
return final_importance
def _get_file_content_sample(self, file_path: Path, size: int) -> Tuple[Optional[bytes], Optional[str]]:
"""Reads samples from file for hashing and entropy calculation."""
content_sample = b""
full_content = b""
hash_hex = None
try:
with open(file_path, 'rb') as f:
# Read start chunk
start_chunk = f.read(self.HASH_SAMPLE_SIZE_START)
content_sample += start_chunk
if size <= self.MAX_SIZE_FOR_FULL_HASH:
# Read the rest if small enough for full hash
remaining_chunk = f.read()
content_sample += remaining_chunk
full_content = content_sample # Store for hashing
elif size > self.HASH_SAMPLE_SIZE_START:
# If larger, read end chunk as well for sample hash
f.seek(max(self.HASH_SAMPLE_SIZE_START, size - self.HASH_SAMPLE_SIZE_END))
end_chunk = f.read(self.HASH_SAMPLE_SIZE_END)
content_sample += end_chunk
full_content = start_chunk + end_chunk # Use start+end for sample hash
# Calculate hash based on the determined content (full or sample)
if full_content:
hasher = hashlib.md5()
hasher.update(full_content)
hash_hex = hasher.hexdigest()
# Return the sample for entropy (might differ from hashed content if file > MAX_SIZE...)
entropy_sample = content_sample[:self.MAX_SIZE_FOR_ENTROPY_CALC]
return entropy_sample, hash_hex
except (IOError, OSError, PermissionError) as e:
self._log_error(file_path.parts[-2], file_path.name, e) # Assuming user is second to last part
return None, None
def _process_file(self, entry: os.DirEntry, user_dir: Path, user_name: str):
"""Extracts metadata and calculates initial metrics for a single file."""
file_path = Path(entry.path)
try:
stats = entry.stat() # Use stat from scandir directly
# Skip if not a file or if size is zero
if not entry.is_file(follow_symlinks=False) or stats.st_size == 0:
return
# Basic metadata
size = stats.st_size
modified_time = datetime.fromtimestamp(stats.st_mtime)
accessed_time = datetime.fromtimestamp(stats.st_atime)
relative_path = file_path.relative_to(user_dir)
ext = file_path.suffix.lower().lstrip('.')
# Age calculation
now_ts = time.time()
age_days = (now_ts - stats.st_mtime) / (60 * 60 * 24)
if age_days < 30:
age_category = "Recent (<30d)"
elif age_days < 180:
age_category = "Medium (30-180d)"
else:
age_category = "Old (>180d)"
# Keyword check in filename
filename_lower = file_path.name.lower()
has_important_keyword = any(keyword in filename_lower for keyword in IMPORTANT_KEYWORDS)
# Content analysis (Entropy, Hashing for uniqueness)
entropy = 0.0
hash_sample = None
is_unique = False
content_sample = None
is_likely_text = False # Default
if size < self.MAX_SIZE_FOR_ENTROPY_CALC: # Only process smaller files for content
content_sample, hash_sample = self._get_file_content_sample(file_path, size)
if content_sample:
entropy = calculate_entropy(content_sample)
is_likely_text = is_likely_text_file(content_sample, ext) # Check if text
if hash_sample:
# Check uniqueness based on hash sample
if hash_sample not in self.content_hashes:
is_unique = True
self.content_hashes.add(hash_sample)
# Create preliminary metadata object (importance calculated later)
file_info = FileMetadata(
user=user_name,
path=relative_path,
absolute_path=file_path,
size=size,
modified=modified_time,
accessed=accessed_time,
extension=ext,
age_days=age_days,
age_category=age_category,
entropy=entropy,
is_unique=is_unique,
importance=0.0, # Placeholder
hash_sample=hash_sample,
has_important_keyword=has_important_keyword,
is_likely_text=is_likely_text
)
# Calculate final importance score
importance = self._calculate_importance(file_info)
final_file_info = file_info._replace(importance=importance)
# Store metadata
self.file_metadata_list.append(final_file_info)
# Update statistics
self.user_stats[user_name]['total_files'] += 1
self.user_stats[user_name]['total_size'] += size
self.user_stats[user_name]['extensions'][ext] += 1
self.user_stats[user_name]['age_distribution'][age_category] += 1
if importance >= 0.8: # Use a threshold for 'important' count
self.user_stats[user_name]['critical_files'] += 1
elif importance >= 0.6:
self.user_stats[user_name]['high_importance_files'] += 1
if age_category == "Recent (<30d)":
self.user_stats[user_name]['modified_recently'] += 1
if is_unique:
self.user_stats[user_name]['unique_content_samples'] += 1
if is_likely_text:
self.user_stats[user_name]['likely_text_files'] += 1
if has_important_keyword:
self.user_stats[user_name]['keyword_files'] += 1
# Update global counters
self.total_files_scanned += 1
self.total_size_scanned += size
except (PermissionError, FileNotFoundError, OSError) as e:
self._log_error(user_name, file_path.name, e)
except Exception as e: # Catch unexpected errors during processing
self._log_error(user_name, file_path.name, f"Unexpected processing error: {e}")
def _scan_user_directory(self, user_dir: Path, progress: Progress, task_id: TaskID) -> int:
"""
Scans a single user's directory iteratively.
Args:
user_dir: Path object for the user's home directory.
progress: Rich Progress object for updating UI.
task_id: TaskID for the Rich progress bar associated with this user.
Returns:
The total number of files processed for this user.
"""
user_name = user_dir.name
files_processed_count = 0
dirs_to_scan: List[Tuple[Path, int]] = [(user_dir, 0)] # Queue: (directory, depth)
processed_dirs = 0
while dirs_to_scan:
current_dir, current_depth = dirs_to_scan.pop(0)
processed_dirs += 1
# Update progress description periodically
if processed_dirs % 20 == 0:
progress.update(task_id, description=f"Scanning {user_name}: {current_dir.relative_to(user_dir)}", advance=0)
# Check depth limit
if current_depth > self.scan_depth:
continue
# Skip low-value directory patterns entirely (performance)
if any(part in LOW_VALUE_DIR_PATTERNS for part in current_dir.relative_to(user_dir).parts):
continue
try:
with os.scandir(current_dir) as entries:
for entry in entries:
entry_path = Path(entry.path)
entry_name_lower = entry.name.lower()
relative_entry_path = entry_path.relative_to(user_dir)
# Skip hidden files/dirs unless explicitly important (e.g., .ssh)
# Allow specific hidden config dirs like .config, .aws etc.
is_hidden = entry.name.startswith('.')
is_important_hidden = entry_name_lower in {'.ssh', '.gnupg', '.aws', '.kube', '.config'}
if is_hidden and not is_important_hidden:
continue
try:
if entry.is_dir(follow_symlinks=False):
# Check if the directory itself matches low value patterns
if entry.name in LOW_VALUE_DIR_PATTERNS:
continue
# Check if path components match low value patterns
if any(part in LOW_VALUE_DIR_PATTERNS for part in relative_entry_path.parts):
continue
# Add directory to queue if within depth
if current_depth + 1 <= self.scan_depth:
dirs_to_scan.append((entry_path, current_depth + 1))
self.user_stats[user_name]['directories_scanned'] += 1
elif entry.is_file(follow_symlinks=False):
self._process_file(entry, user_dir, user_name)
files_processed_count += 1
# Update progress bar more frequently for files
if files_processed_count % 100 == 0:
progress.update(task_id, advance=100) # Advance progress
except (PermissionError, FileNotFoundError, OSError) as e:
# Log error for specific entry, but continue scan
self._log_error(user_name, entry.name, e)
except Exception as e:
self._log_error(user_name, entry.name, f"Unexpected entry error: {e}")
except (PermissionError, FileNotFoundError, OSError) as e:
# Log error for the directory itself, stop scanning this branch
self._log_error(user_name, current_dir.name, e)
except Exception as e:
self._log_error(user_name, current_dir.name, f"Unexpected directory error: {e}")
# Final update for the task when user scan is complete
progress.update(task_id, completed=files_processed_count, total=files_processed_count, description=f"Finished {user_name}")
return files_processed_count
def scan_home_directory(self) -> None:
"""Scans all user directories under the base path."""
self.console.print(f"[bold]Starting scan under:[/bold] [cyan]{self.base_path}[/cyan]")
if not self.base_path.is_dir():
self.console.print(f"[bold red]Error:[/bold red] Path '{self.base_path}' is not a valid directory.")
sys.exit(1)
try:
user_dirs = [d for d in self.base_path.iterdir() if d.is_dir()]
except PermissionError:
self.console.print("[bold red]Error:[/bold red] Insufficient permissions to list directories in "
f"'{self.base_path}'. Please run with 'sudo'.")
sys.exit(1)
except Exception as e:
self.console.print(f"[bold red]Error:[/bold red] Failed to list directories in '{self.base_path}': {e}")
sys.exit(1)
if not user_dirs:
self.console.print(f"[yellow]No user directories found under '{self.base_path}'.[/yellow]")
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=None, complete_style="green"),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("({task.completed} files)"),
TimeElapsedColumn(),
console=self.console, # Use the stderr console
transient=False # Keep progress visible after completion
) as progress:
scan_overall_task = progress.add_task("[bold green]Scanning Users...", total=len(user_dirs))
for user_dir in user_dirs:
user_name = user_dir.name
# Basic check to skip obviously non-user dirs
if user_name.startswith('.') or user_name in ('lost+found', 'skel', 'root'):
progress.advance(scan_overall_task)
continue
# Initialize stats for this user
self.user_stats[user_name] = {
'total_files': 0, 'total_size': 0, 'extensions': Counter(),
'age_distribution': defaultdict(int), 'critical_files': 0,
'high_importance_files': 0, 'modified_recently': 0,
'unique_content_samples': 0, 'directories_scanned': 0,
'likely_text_files': 0, 'keyword_files': 0
}
# Add a task for this specific user's file scan
# Initialize with total=1 to avoid the NoneType error in rich
# We will update total later if possible, or just track completion.
user_file_task = progress.add_task(f"Scanning {user_name}...", total=1, start=False)
try:
# Start the task now
progress.start_task(user_file_task)
# Scan the directory
files_count = self._scan_user_directory(user_dir, progress, user_file_task)
# Update the task total if we got a count, otherwise mark as finished
if files_count > 0:
progress.update(user_file_task, total=files_count, completed=files_count)
else:
progress.update(user_file_task, completed=1, total=1) # Mark as complete even if 0 files
except Exception as e:
# Catch errors during the user scan setup/call
self.console.print(f"[yellow]Warning:[/yellow] Error initiating scan for {user_name}: {e}")
self._log_error(user_name, user_dir.name, e)
progress.update(user_file_task, description=f"[red]Error scanning {user_name}[/red]", completed=1, total=1)
# Advance the overall user scanning task
progress.advance(scan_overall_task)
self.console.print("[bold green]Scan complete.[/bold green]")
def analyze_data(self) -> pd.DataFrame:
"""
Converts collected file metadata into a Pandas DataFrame and sorts it.
Returns:
A Pandas DataFrame containing all file metadata, sorted by importance.
Returns an empty DataFrame if no data was collected.
"""
if not self.file_metadata_list:
self.console.print("[yellow]Warning:[/yellow] No file metadata collected during scan.")
return pd.DataFrame()
self.console.print(f"Analyzing {len(self.file_metadata_list):,} collected file records...")
try:
# Convert list of NamedTuples to DataFrame
df = pd.DataFrame(self.file_metadata_list)
# Convert types for better analysis (optional but good practice)
df['modified'] = pd.to_datetime(df['modified'])
df['accessed'] = pd.to_datetime(df['accessed'])
df['size'] = pd.to_numeric(df['size'])
df['importance'] = pd.to_numeric(df['importance'])
df['entropy'] = pd.to_numeric(df['entropy'])
df['age_days'] = pd.to_numeric(df['age_days'])
# Sort by importance (descending)
df = df.sort_values(by='importance', ascending=False).reset_index(drop=True)
self.console.print("Analysis complete.")
return df
except Exception as e:
self.console.print(f"[bold red]Error:[/bold red] Failed to create or process DataFrame: {e}")
# Provide more debug info if needed
import traceback
self.console.print("[dim]" + traceback.format_exc() + "[/dim]")
return pd.DataFrame() # Return empty on failure
def get_backup_recommendations(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Generates backup strategy recommendations based on the analyzed data.
Args:
df: The DataFrame containing analyzed file data.
Returns:
A dictionary where keys are usernames and values are recommendation details.
"""
if df.empty:
return {}
recommendations = {}
# Define importance thresholds dynamically or use fixed ones
critical_threshold = 0.8
high_threshold = 0.6
medium_threshold = 0.4
for user, user_df in df.groupby('user'):
user_stats = self.user_stats.get(user, {}) # Get stats collected during scan
total_size = user_stats.get('total_size', 0)
total_files = user_stats.get('total_files', 0)
# --- Calculate Sizes by Importance ---
critical_size = user_df[user_df['importance'] >= critical_threshold]['size'].sum()
high_size = user_df[(user_df['importance'] >= high_threshold) & (user_df['importance'] < critical_threshold)]['size'].sum()
medium_size = user_df[(user_df['importance'] >= medium_threshold) & (user_df['importance'] < high_threshold)]['size'].sum()
low_size = user_df[user_df['importance'] < medium_threshold]['size'].sum()
# --- Identify Important Directories ---
# Consider directories containing multiple critical/high importance files
important_file_paths = user_df[user_df['importance'] >= high_threshold]['path']
dir_importance_counter = Counter()
for file_path in important_file_paths:
if file_path.parent != Path('.'): # Avoid counting files directly in home
dir_importance_counter[str(file_path.parent)] += 1 # Convert Path to str for key
# Select directories with a minimum number of important files (e.g., >= 3)
min_important_files_in_dir = 3
top_important_dirs = {
path: count for path, count in dir_importance_counter.most_common(10) # Top 10 dirs
if count >= min_important_files_in_dir
}
# --- File Type Analysis ---
size_by_ext = user_df.groupby('extension')['size'].sum().sort_values(ascending=False)
# Filter for meaningful size contributions (e.g., > 1MB)
significant_size_by_ext = size_by_ext[size_by_ext > 1024*1024].head(10)
# Calculate average importance per extension (for extensions with enough files)
min_files_for_avg_imp = 5
avg_importance_by_ext = {}
for ext, group in user_df.groupby('extension'):
if len(group) >= min_files_for_avg_imp:
avg_importance_by_ext[ext] = group['importance'].mean()
top_value_extensions = dict(sorted(avg_importance_by_ext.items(), key=lambda item: item[1], reverse=True)[:10])
# --- Change Rate ---
recent_changes_count = user_stats.get('modified_recently', 0)
recent_change_rate = recent_changes_count / total_files if total_files > 0 else 0
# --- Deduplication Potential ---
# High potential if many non-unique samples were found
total_samples = len(user_df[user_df['hash_sample'].notna()])
unique_samples = user_stats.get('unique_content_samples', 0)
duplicate_ratio = (total_samples - unique_samples) / total_samples if total_samples > 0 else 0
if duplicate_ratio > 0.3:
dedup_potential = 'High'
elif duplicate_ratio > 0.1:
dedup_potential = 'Medium'
else:
dedup_potential = 'Low'
# --- Compression Potential ---
# High potential if large proportion of text files
likely_text_files = user_stats.get('likely_text_files', 0)
text_file_ratio = likely_text_files / total_files if total_files > 0 else 0
compression_potential = 'High' if text_file_ratio > 0.5 else 'Medium' if text_file_ratio > 0.2 else 'Low'
# --- Determine Backup Strategy ---
strategy = self._determine_backup_strategy(
total_size=total_size,
critical_size=critical_size,
high_size=high_size,
recent_change_rate=recent_change_rate,
compression_potential=compression_potential,
deduplication_potential=dedup_potential
)
# --- Assemble Recommendations ---
recommendations[user] = {
'profile': {
'total_files': total_files,
'total_size': total_size,
'critical_data_size': critical_size,
'high_importance_data_size': high_size,
'medium_importance_data_size': medium_size,
'low_importance_data_size': low_size,
'recent_changes_count': recent_changes_count,
'recent_change_rate': recent_change_rate,
'likely_text_file_ratio': text_file_ratio,
'duplicate_sample_ratio': duplicate_ratio,
},
'key_areas': {
'important_dirs': top_important_dirs,
'significant_size_by_ext': significant_size_by_ext.to_dict(),
'top_value_extensions': top_value_extensions,
},
'strategy': strategy,
'scan_errors': self.scan_errors.get(user, []) # Include errors specific to this user
}
return recommendations
def _determine_backup_strategy(
self, total_size: int, critical_size: int, high_size: int,
recent_change_rate: float, compression_potential: str,
deduplication_potential: str
) -> Dict[str, Any]:
"""Determines backup frequency, retention, and technical suggestions."""
total_gb = total_size / (1024**3)
critical_gb = critical_size / (1024**3)
high_gb = high_size / (1024**3)
essential_gb = critical_gb + high_gb
# --- Frequency ---
# Full backup frequency based on total size
if total_gb > 500: full_freq = "Monthly"
elif total_gb > 100: full_freq = "Bi-Weekly"
else: full_freq = "Weekly"
# Incremental/Differential frequency based on change rate and essential size
if recent_change_rate > 0.1 or essential_gb > 50: # High change or lots of important data
incr_freq = "Daily"
elif recent_change_rate > 0.02 or essential_gb > 10:
incr_freq = "Every 2-3 Days"
else:
incr_freq = "Weekly"
# Critical data backup frequency (more frequent for most vital)
if critical_gb > 10 or recent_change_rate > 0.15:
crit_freq = "Every 4-6 Hours"
elif critical_gb > 1 or recent_change_rate > 0.05:
crit_freq = "Daily"
else:
crit_freq = "Daily" # Minimum daily for critical
# --- Retention ---
# Longer retention for smaller datasets, shorter for very large ones
if total_gb > 1000: # > 1TB
retention_full = "1 Month"
retention_incr = "1 Week"
retention_crit = "2 Weeks"
elif total_gb > 200: # > 200GB
retention_full = "2 Months"
retention_incr = "2 Weeks"
retention_crit = "1 Month"
else:
retention_full = "3-6 Months"
retention_incr = "1 Month"
retention_crit = "2 Months"
# --- Technical ---
compression_rec = compression_potential in ['High', 'Medium'] or total_gb > 50
deduplication_rec = deduplication_potential in ['High', 'Medium'] or total_gb > 100
# Recommend encryption if significant critical/high importance data exists
encryption_rec = essential_gb > 0.5 # Recommend if > 500MB essential data
return {
'full_backup_frequency': full_freq,
'incremental_frequency': incr_freq,
'critical_data_backup_frequency': crit_freq,
'estimated_total_backup_size_gb': round(total_gb, 2),
'estimated_essential_backup_size_gb': round(essential_gb, 2),
'recommended_retention': {
'full': retention_full,
'incremental': retention_incr,
'critical': retention_crit,
},
'compression_recommended': compression_rec,
'deduplication_benefit': deduplication_potential, # Keep original potential rating
'deduplication_recommended': deduplication_rec,
'encryption_recommended': encryption_rec,
}
# --- UI and Reporting Functions ---
def display_summary_panel(analyzer: FileSignificanceAnalyzer) -> Panel:
"""Creates a Rich Panel summarizing the overall scan results."""
num_users = len(analyzer.user_stats)
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values())
summary_text = Text.assemble(
("Analyzed: ", "bold"), (f"{analyzer.total_files_scanned:,}", "cyan"), " files (",
(f"{format_size(analyzer.total_size_scanned)}", "cyan"), ") across ",
(f"{num_users}", "cyan"), " users.\n",
("Scan Depth: ", "bold"), (f"{analyzer.scan_depth}", "cyan"), " levels.\n",
("Errors Encountered: ", "bold"),
(f"{total_errors:,}", "red" if total_errors > 0 else "green")
)
return Panel(
summary_text,
title="[bold blue]Scan Summary[/bold blue]",
box=box.DOUBLE,
border_style="blue",
padding=(1, 2)
)
def display_user_statistics_table(analyzer: FileSignificanceAnalyzer) -> Table:
"""Creates a Rich Table summarizing statistics per user."""
table = Table(title="[bold]User Data Overview[/bold]", box=box.ROUNDED, show_header=True, header_style="bold magenta")
table.add_column("User", style="cyan", min_width=12)
table.add_column("Total Files", justify="right")
table.add_column("Total Size", justify="right")
table.add_column("Crit ≥0.8", justify="right", style="red") # Critical files
table.add_column("High ≥0.6", justify="right", style="orange3") # High importance
table.add_column("Recent Files", justify="right")
table.add_column("Unique Samples", justify="right")
table.add_column("Errors", justify="right")
sorted_users = sorted(analyzer.user_stats.items(), key=lambda item: item[1].get('total_size', 0), reverse=True)
for user, stats in sorted_users:
errors_count = len(analyzer.scan_errors.get(user, []))
table.add_row(
user,
f"{stats.get('total_files', 0):,}",
format_size(stats.get('total_size', 0)),
f"{stats.get('critical_files', 0):,}",
f"{stats.get('high_importance_files', 0):,}",
f"{stats.get('modified_recently', 0):,}",
f"{stats.get('unique_content_samples', 0):,}",
f"[red]{errors_count:,}[/red]" if errors_count > 0 else "[green]0[/green]"
)
return table
def display_recommendations(recommendations: Dict[str, Any], console: Console) -> None:
"""Displays detailed recommendations for each user."""
console.print("\n" + "="*20 + " [bold green]Backup Recommendations[/bold green] " + "="*20 + "\n")
if not recommendations:
console.print("[yellow]No recommendations generated (likely no data analyzed).[/yellow]")
return
for user, rec in recommendations.items():
profile = rec['profile']
key_areas = rec['key_areas']
strategy = rec['strategy']
# --- Profile Panel ---
profile_table = Table.grid(padding=(0, 2))
profile_table.add_column()
profile_table.add_column()
profile_table.add_row("[bold]Total Size:[/bold]", f"[cyan]{format_size(profile['total_size'])}[/cyan] ({profile['total_files']:,} files)")
profile_table.add_row("[bold red]Critical Data (≥0.8):[/]", f"[cyan]{format_size(profile['critical_data_size'])}[/cyan]")
profile_table.add_row("[bold orange3]High Importance (≥0.6):[/]", f"[cyan]{format_size(profile['high_importance_data_size'])}[/cyan]")
profile_table.add_row("[bold]Recent Changes:[/bold]", f"{profile['recent_changes_count']:,} files ({profile['recent_change_rate']:.1%})")
profile_table.add_row("[bold]Text File Ratio:[/bold]", f"{profile['likely_text_file_ratio']:.1%}")
profile_table.add_row("[bold]Duplicate Sample Ratio:[/bold]", f"{profile['duplicate_sample_ratio']:.1%}")
profile_panel = Panel(profile_table, title="[gold1]Data Profile[/gold1]", box=box.ROUNDED, border_style="yellow", expand=False)
# --- Key Areas Panel ---
key_areas_group = []
# Important Dirs Tree
if key_areas['important_dirs']:
dir_tree = Tree("[bold]Top Important Directories:[/bold]")
sorted_dirs = sorted(key_areas['important_dirs'].items(), key=lambda item: item[1], reverse=True)
for dir_path, count in sorted_dirs:
dir_tree.add(f"[cyan]{dir_path}[/cyan] ({count} files)")
key_areas_group.append(dir_tree)
# Top Extensions by Size
if key_areas['significant_size_by_ext']:
ext_size_list = "\n".join([f"- [green]{ext}[/green]: {format_size(size)}" for ext, size in key_areas['significant_size_by_ext'].items()])
key_areas_group.append(Text.from_markup(f"\n[bold]Top Extensions by Size:[/bold]\n{ext_size_list}"))
# Top Extensions by Avg Importance
if key_areas['top_value_extensions']:
ext_value_list = "\n".join([f"- [magenta]{ext}[/magenta]: {value:.2f} avg score" for ext, value in key_areas['top_value_extensions'].items()])
key_areas_group.append(Text.from_markup(f"\n[bold]Highest Avg Importance Extensions:[/bold]\n{ext_value_list}"))
key_areas_panel = Panel(Group(*key_areas_group), title="[dark_sea_green4]Key Content Areas[/dark_sea_green4]", box=box.ROUNDED, border_style="green", expand=False)
# --- Strategy Panel ---
strat = strategy # Alias for brevity
strategy_table = Table.grid(padding=(0, 2))
strategy_table.add_column(style="bold")
strategy_table.add_column()
strategy_table.add_row("Full Backup:", f"[cyan]{strat['full_backup_frequency']}[/cyan]")
strategy_table.add_row("Incremental Backup:", f"[cyan]{strat['incremental_frequency']}[/cyan]")
strategy_table.add_row("Critical Data Backup:", f"[cyan]{strat['critical_data_backup_frequency']}[/cyan]")
strategy_table.add_row("Retention (Full):", f"[cyan]{strat['recommended_retention']['full']}[/cyan]")
strategy_table.add_row("Retention (Incr):", f"[cyan]{strat['recommended_retention']['incremental']}[/cyan]")
strategy_table.add_row("Retention (Crit):", f"[cyan]{strat['recommended_retention']['critical']}[/cyan]")
strategy_table.add_row("Est. Total Size:", f"[cyan]{strat['estimated_total_backup_size_gb']:.2f} GB[/cyan]")
strategy_table.add_row("Est. Essential Size:", f"[cyan]{strat['estimated_essential_backup_size_gb']:.2f} GB[/cyan]")
strategy_table.add_row("Compression:", "[green]Recommended[/green]" if strat['compression_recommended'] else "[yellow]Optional[/yellow]")
strategy_table.add_row("Deduplication:", f"[green]Recommended[/green] (Benefit: {strat['deduplication_benefit']})" if strat['deduplication_recommended'] else f"[yellow]Optional[/yellow] (Benefit: {strat['deduplication_benefit']})")
strategy_table.add_row("Encryption:", "[green]Recommended[/green]" if strat['encryption_recommended'] else "[yellow]Optional[/yellow]")
strategy_panel = Panel(strategy_table, title="[steel_blue1]Recommended Strategy[/steel_blue1]", box=box.ROUNDED, border_style="blue", expand=False)
# --- Combine Panels for User ---
user_layout = Layout()
user_layout.split_row(
Layout(profile_panel, ratio=1),
Layout(key_areas_panel, ratio=1),
Layout(strategy_panel, ratio=1)
)
console.print(Panel(user_layout, title=f"[bold blue]User: [cyan]{user}[/cyan]", border_style="blue", expand=False))
console.print() # Spacer
def display_top_files_table(df: pd.DataFrame, console: Console, num_files: int = 20) -> None:
"""Displays a table of the most important files found."""
if df.empty:
return
console.print(f"\n--- [bold magenta]Top {num_files} Most Important Files[/bold magenta] ---\n")
table = Table(box=box.HEAVY_EDGE, show_header=True, header_style="bold green")
table.add_column("Rank", style="dim", width=4)
table.add_column("User", style="cyan", min_width=10)
table.add_column("Path", style="green", max_width=70) # Limit path width
table.add_column("Size", justify="right")
table.add_column("Modified", justify="center")
table.add_column("Score", justify="right")
table.add_column("Factors", style="dim", min_width=20) # Contributing factors
top_files = df.head(num_files)
for index, row in top_files.iterrows():
# Determine color based on importance score
score = row['importance']
if score >= 0.8: score_style = "[bold red]"
elif score >= 0.6: score_style = "[orange3]"
elif score >= 0.4: score_style = "[yellow]"
else: score_style = "[dim]"
# Format path nicely
path_str = str(row['path'])
if len(path_str) > 65: # Truncate long paths
path_str = "..." + path_str[-62:]
# Summarize key factors contributing to the score
factors = []
if row['has_important_keyword']: factors.append("keyword")
if row['importance'] > 0.7 and any(part in IMPORTANT_DIR_NAMES for part in row['path'].parts): factors.append("imp_dir")
if row['extension'] in EXTENSION_WEIGHTS and EXTENSION_WEIGHTS[row['extension']] > 0.8: factors.append("ext")
if row['age_days'] < 30: factors.append("recent")
if row['entropy'] > 0.8: factors.append("entropy")
factors_str = ", ".join(factors)
table.add_row(
str(index + 1),
row['user'],
path_str,
format_size(row['size']),
row['modified'].strftime("%Y-%m-%d"),
f"{score_style}{score:.3f}[/]",
factors_str
)
console.print(table)
def display_scan_errors(analyzer: FileSignificanceAnalyzer, console: Console) -> None:
"""Displays errors encountered during the scan."""
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values())
if total_errors == 0:
return # Don't display if no errors
console.print("\n" + "="*20 + " [bold yellow]Scan Errors[/bold yellow] " + "="*20 + "\n")
error_tree = Tree(f"[bold yellow]Encountered {total_errors} errors during scan:[/]")
max_errors_per_user = 5 # Limit displayed errors per user
for user, errors in analyzer.scan_errors.items():
if not errors:
continue
user_node = error_tree.add(f"[cyan]{user}[/cyan] ({len(errors)} errors)")
for i, error_msg in enumerate(errors):
if i < max_errors_per_user:
user_node.add(f"[dim red]- {error_msg}[/dim red]")
elif i == max_errors_per_user:
user_node.add(f"[dim]... ({len(errors) - max_errors_per_user} more errors not shown)[/dim]")
break
console.print(error_tree)
def generate_markdown_report(
analyzer: FileSignificanceAnalyzer,
df: pd.DataFrame,
recommendations: Dict[str, Any]
) -> str:
"""Generates a comprehensive Markdown report of the analysis."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
scan_path = analyzer.base_path
num_users = len(analyzer.user_stats)
total_errors = sum(len(errors) for errors in analyzer.scan_errors.values())
# --- Header ---
md = f"""# Backup Significance Analysis Report
**Generated:** {timestamp}
**Scan Path:** `{scan_path}`
**Scan Depth:** {analyzer.scan_depth}
## 1. Scan Summary
* **Files Analyzed:** {analyzer.total_files_scanned:,}
* **Total Data Size:** {format_size(analyzer.total_size_scanned)}
* **Users Scanned:** {num_users}
* **Scan Errors:** {total_errors:,}
## 2. User Data Overview
| User | Total Files | Total Size | Critical Files (≥0.8) | High Importance (≥0.6) | Recent Files (<30d) | Unique Samples | Scan Errors |
|------|-------------|------------|-----------------------|------------------------|---------------------|----------------|-------------|
"""
# --- User Stats Table ---
sorted_users = sorted(analyzer.user_stats.items(), key=lambda item: item[1].get('total_size', 0), reverse=True)
for user, stats in sorted_users:
errors_count = len(analyzer.scan_errors.get(user, []))
md += f"| {user} | {stats.get('total_files', 0):,} | {format_size(stats.get('total_size', 0))} | {stats.get('critical_files', 0):,} | {stats.get('high_importance_files', 0):,} | {stats.get('modified_recently', 0):,} | {stats.get('unique_content_samples', 0):,} | {errors_count:,} |\n"
# --- Recommendations ---
md += "\n## 3. Backup Recommendations per User\n"
if not recommendations:
md += "\n_No recommendations generated._\n"
else:
for user, rec in recommendations.items():
profile = rec['profile']
key_areas = rec['key_areas']
strategy = rec['strategy']
md += f"\n### User: `{user}`\n\n"
# Profile
md += "**Data Profile:**\n"
md += f"- Total Size: **{format_size(profile['total_size'])}** ({profile['total_files']:,} files)\n"
md += f"- Critical Data (Score ≥0.8): **{format_size(profile['critical_data_size'])}**\n"
md += f"- High Importance Data (Score ≥0.6): **{format_size(profile['high_importance_data_size'])}**\n"
md += f"- Recent Changes (<30d): **{profile['recent_changes_count']:,} files** ({profile['recent_change_rate']:.1%})\n"
md += f"- Text File Ratio: **{profile['likely_text_file_ratio']:.1%}** (Affects Compression)\n"
md += f"- Duplicate Sample Ratio: **{profile['duplicate_sample_ratio']:.1%}** (Affects Deduplication)\n\n"
# Key Areas
md += "**Key Content Areas:**\n"
if key_areas['important_dirs']:
md += "- _Top Important Directories:_\n"
sorted_dirs = sorted(key_areas['important_dirs'].items(), key=lambda item: item[1], reverse=True)
for dir_path, count in sorted_dirs:
md += f" - `{dir_path}` ({count} files)\n"
if key_areas['significant_size_by_ext']:
md += "- _Top Extensions by Size:_\n"
for ext, size in key_areas['significant_size_by_ext'].items():
md += f" - `{ext if ext else '<none>'}`: {format_size(size)}\n"
if key_areas['top_value_extensions']:
md += "- _Highest Avg Importance Extensions:_\n"
for ext, value in key_areas['top_value_extensions'].items():
md += f" - `{ext if ext else '<none>'}`: {value:.2f} avg score\n"
md += "\n"
# Strategy
strat = strategy # Alias
md += "**Recommended Strategy:**\n"
md += f"- Full Backup: **{strat['full_backup_frequency']}**\n"
md += f"- Incremental Backup: **{strat['incremental_frequency']}**\n"
md += f"- Critical Data Backup: **{strat['critical_data_backup_frequency']}**\n"
md += "- Retention:\n"
md += f" - Full: **{strat['recommended_retention']['full']}**\n"
md += f" - Incremental: **{strat['recommended_retention']['incremental']}**\n"
md += f" - Critical: **{strat['recommended_retention']['critical']}**\n"
md += f"- Estimated Total Size: **{strat['estimated_total_backup_size_gb']:.2f} GB**\n"
md += f"- Estimated Essential Size (Crit+High): **{strat['estimated_essential_backup_size_gb']:.2f} GB**\n"
md += f"- Compression: **{'Recommended' if strat['compression_recommended'] else 'Optional'}**\n"
md += f"- Deduplication: **{'Recommended' if strat['deduplication_recommended'] else 'Optional'}** (Benefit: {strat['deduplication_benefit']})\n"
md += f"- Encryption: **{'Recommended' if strat['encryption_recommended'] else 'Optional'}**\n\n"
# --- Top Files ---
md += "\n## 4. Top Important Files\n\n"
if df.empty:
md += "_No file data available._\n"
else:
md += "| Rank | User | Path | Size | Modified | Score | Factors |\n"
md += "|------|------|------|------|----------|-------|---------|\n"
top_files = df.head(20) # Include top 20 in report
for index, row in top_files.iterrows():
path_str = str(row['path'])
# Basic escaping for Markdown table
path_str_md = path_str.replace('|', '\\|')
if len(path_str_md) > 65: path_str_md = "..." + path_str_md[-62:]
score = row['importance']
factors = []
if row['has_important_keyword']: factors.append("keyword")
if score > 0.7 and any(part in IMPORTANT_DIR_NAMES for part in row['path'].parts): factors.append("imp_dir")
if row['extension'] in EXTENSION_WEIGHTS and EXTENSION_WEIGHTS[row['extension']] > 0.8: factors.append("ext")
if row['age_days'] < 30: factors.append("recent")
if row['entropy'] > 0.8: factors.append("entropy")
factors_str = ", ".join(factors) if factors else "-"
md += f"| {index + 1} | {row['user']} | `{path_str_md}` | {format_size(row['size'])} | {row['modified'].strftime('%Y-%m-%d')} | {score:.3f} | {factors_str} |\n"
# --- Scan Errors ---
if total_errors > 0:
md += "\n## 5. Scan Errors Encountered\n\n"
md += "_Note: Only the first few errors per user are listed below._\n\n"
max_errors_per_user_report = 10
for user, errors in analyzer.scan_errors.items():
if not errors: continue
md += f"### User: `{user}` ({len(errors)} errors)\n\n"
for i, error_msg in enumerate(errors):
if i < max_errors_per_user_report:
# Basic code formatting for the error message
md += f"- `{error_msg}`\n"
elif i == max_errors_per_user_report:
md += f"- ... ({len(errors) - max_errors_per_user_report} more errors not shown)\n"
break
md += "\n"
# --- Footer ---
md += "\n---\n"
md += f"*Report generated by Backup Significance Analyzer v2.1.0 on {timestamp}*"
return md
# --- Main Execution ---
def main():
"""Main execution function."""
console = Console() # Main console for output
console.print(Panel.fit(
Text("Backup Significance Analyzer v2.1.0", style="bold blue", justify="center") +
Text("\nStatistical analysis for optimized backup strategies", style="cyan", justify="center"),
box=box.DOUBLE,
border_style="blue"
))
console.print()
# Check for root/sudo privileges
is_sudo = os.geteuid() == 0
if not is_sudo:
console.print("[bold yellow]Warning:[/bold yellow] This script likely needs root/sudo permissions "
"to read all user directories in '/home'.")
console.print("[yellow]Analysis may be incomplete due to permission errors.[/yellow]")
if not Confirm.ask("Continue anyway?", default=False):
sys.exit(0)
else:
console.print("[green]Running with elevated (sudo) permissions.[/green]")
console.print()
# Determine target path
target_path = "/home"
if len(sys.argv) > 1:
target_path_arg = sys.argv[1]
if Path(target_path_arg).is_dir():
target_path = target_path_arg
else:
console.print(f"[yellow]Warning:[/yellow] Provided path '{target_path_arg}' is not a valid directory. "
f"Defaulting to '{target_path}'.")
# --- Initialize and Run Scan ---
analyzer = FileSignificanceAnalyzer(base_path=target_path, scan_depth=10) # Increased depth
try:
analyzer.scan_home_directory()
except Exception as e:
console.print(f"\n[bold red]Fatal error during directory scan:[/bold red] {e}")
import traceback
console.print("[dim]" + traceback.format_exc() + "[/dim]")
sys.exit(1)
if analyzer.total_files_scanned == 0 and not any(analyzer.scan_errors.values()):
console.print("[yellow]Scan complete, but no files were found or processed.[/yellow]")
console.print("[yellow]Check the target path and permissions.[/yellow]")
display_scan_errors(analyzer, console) # Show errors if any
return # Exit if nothing was scanned
# --- Analyze Data ---
console.print("\n[bold green]Analyzing collected data...[/bold green]")
# Use a status indicator for potentially long analysis
with console.status("[bold cyan]Performing significance analysis...", spinner="dots"):
df = analyzer.analyze_data()
if df.empty and analyzer.total_files_scanned > 0:
console.print("[bold red]Error:[/bold red] Data analysis failed. Check previous error messages.")
display_scan_errors(analyzer, console)
return
recommendations = analyzer.get_backup_recommendations(df)
console.print("[bold green]Analysis finished.[/bold green]")
# --- Display Results ---
console.print("\n" + "="*60)
console.print(display_summary_panel(analyzer))
console.print(display_user_statistics_table(analyzer))
display_top_files_table(df, console)
display_recommendations(recommendations, console)
display_scan_errors(analyzer, console) # Display errors at the end
# --- Save Report ---
console.print("\n" + "="*60)
if Confirm.ask("\n[bold]Save analysis report to a file?[/bold]", default=True):
report_format = Prompt.ask(
"Select report format",
choices=["markdown", "text"],
default="markdown"
).lower()
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename_base = f"backup_analysis_{timestamp_str}"
if report_format == "markdown":
filename = f"{filename_base}.md"
console.print(f"Generating Markdown report...")
markdown_content = generate_markdown_report(analyzer, df, recommendations)
try:
with open(filename, "w", encoding='utf-8') as f:
f.write(markdown_content)
console.print(f"[bold green]✔ Markdown report saved to:[/bold green] [cyan]{filename}[/cyan]")
except IOError as e:
console.print(f"[bold red]Error saving Markdown report:[/bold red] {e}")
else: # Text format
filename = f"{filename_base}.txt"
console.print(f"Generating Text report...")
try:
# Capture the console output for the text report
text_console = Console(record=True, width=120) # Use fixed width for text file
text_console.print(display_summary_panel(analyzer))
text_console.print(display_user_statistics_table(analyzer))
display_top_files_table(df, text_console)
display_recommendations(recommendations, text_console)
display_scan_errors(analyzer, text_console)
report_content = text_console.export_text()
with open(filename, "w", encoding='utf-8') as f:
f.write(f"Backup Significance Analysis Report - {timestamp}\n")
f.write("="*80 + "\n")
f.write(report_content)
console.print(f"[bold green]✔ Text report saved to:[/bold green] [cyan]{filename}[/cyan]")
except IOError as e:
console.print(f"[bold red]Error saving Text report:[/bold red] {e}")
except Exception as e:
console.print(f"[bold red]Unexpected error generating text report:[/bold red] {e}")
console.print("\n[bold blue]Analysis complete. Exiting.[/bold blue]")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n[yellow]Analysis interrupted by user. Exiting gracefully.[/yellow]")
sys.exit(0)
except Exception as e:
# Fallback for unexpected errors in main execution flow
console = Console()
console.print(f"\n[bold red]An unexpected critical error occurred:[/bold red]")
console.print_exception(show_locals=False) # Show traceback
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment