|
import shutil |
|
from pathlib import Path |
|
import logging |
|
import re |
|
import subprocess |
|
import yaml |
|
import sys |
|
|
|
|
|
def setup_logging(log_file_path="file_copy.log"): |
|
log_file = Path(log_file_path) |
|
if log_file.exists(): |
|
log_file.unlink() |
|
formatter = logging.Formatter( |
|
fmt="[%(asctime)s][%(levelname)s]: %(message)s", datefmt="%Y-%m-%d %H:%M:%S" |
|
) |
|
file_handler = logging.FileHandler(log_file_path) |
|
file_handler.setFormatter(formatter) |
|
stream_handler = logging.StreamHandler() |
|
stream_handler.setFormatter(formatter) |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
handlers=[file_handler, stream_handler], |
|
) |
|
|
|
|
|
def load_config(config_file="config.yaml"): |
|
"""Load configuration from a YAML file""" |
|
try: |
|
config_path = Path(config_file) |
|
if not config_path.exists(): |
|
logging.error(f"Configuration file not found: {config_file}") |
|
raise FileNotFoundError(f"Configuration file not found: {config_file}") |
|
|
|
if not config_path.exists(): |
|
logging.error(f"Configuration file not found: {config_file}") |
|
raise FileNotFoundError(f"Configuration file not found: {config_file}") |
|
|
|
with open(config_file, "r") as f: |
|
content = f.read() |
|
if not content.strip(): |
|
logging.error(f"Configuration file is empty: {config_file}") |
|
raise ValueError(f"Configuration file is empty: {config_file}") |
|
|
|
# Log the raw content for debugging |
|
logging.debug(f"Raw config file content:\n{content}") |
|
|
|
config = yaml.safe_load(content) |
|
|
|
if config is None: |
|
logging.error(f"Configuration file contains no valid YAML: {config_file}") |
|
raise ValueError(f"Configuration file contains no valid YAML: {config_file}") |
|
|
|
# Validate minimum required configuration |
|
required_keys = ["source_dir", "target_dir", "include_patterns"] |
|
missing_keys = [key for key in required_keys if key not in config] |
|
|
|
if missing_keys: |
|
logging.error(f"Missing required configuration keys: {', '.join(missing_keys)}") |
|
logging.error(f"Configuration must include at minimum: {', '.join(required_keys)}") |
|
|
|
if "include_patterns" in config and not config["include_patterns"]: |
|
logging.warning("'include_patterns' is empty. No files will be copied. 🖐️") |
|
|
|
logging.info(f"Configuration loaded from {config_file}") |
|
logging.debug(f"Config contents: {config}") |
|
return config |
|
except yaml.YAMLError as e: |
|
logging.error(f"Failed to parse YAML configuration: {str(e)}") |
|
logging.error("Check your YAML syntax for errors") |
|
raise |
|
except Exception as e: |
|
logging.error(f"Failed to load configuration: {str(e)}") |
|
raise |
|
|
|
|
|
def generate_tree(directory, output_file, depth, ignore_patterns): |
|
"""Generate a tree structure of the directory and save to file""" |
|
logging.debug(f"Generating tree for directory: {directory}") |
|
|
|
try: |
|
source_path = Path(directory) |
|
if not source_path.exists(): |
|
logging.error(f"Source directory does not exist: {directory}") |
|
return False |
|
|
|
# For tree command, we need to extract just the directory or filename patterns |
|
# The tree command's -I flag expects simple patterns separated by pipes |
|
tree_ignore_patterns = [] |
|
if ignore_patterns: |
|
for pattern in ignore_patterns: |
|
# Extract directory or filename from regex patterns |
|
if "/.*" in pattern: # Pattern is for directories |
|
# Extract directory name between slashes |
|
dir_match = re.search(r"/([^/]+)/", pattern) |
|
if dir_match: |
|
dir_name = dir_match.group(1) |
|
# Remove regex escape for dots if present |
|
dir_name = dir_name.replace("\\.", ".") |
|
if dir_name and dir_name != ".*": |
|
tree_ignore_patterns.append(dir_name) |
|
elif "\\." in pattern: # Pattern is for file extensions |
|
# Extract file extension |
|
ext_match = re.search(r"\\\.([a-zA-Z0-9_]+)", pattern) |
|
if ext_match: |
|
ext = ext_match.group(1) |
|
tree_ignore_patterns.append(f"*.{ext}") |
|
else: # Other patterns |
|
# Extract any literal text that might be a filename |
|
clean_pattern = pattern.replace(".*", "").replace("\\", "").replace("$", "").replace("^", "") |
|
if clean_pattern and len(clean_pattern) > 2: # Avoid too short patterns |
|
tree_ignore_patterns.append(clean_pattern) |
|
|
|
# Join patterns with pipe for the tree command |
|
ignore_pattern_str = "|".join(tree_ignore_patterns) if tree_ignore_patterns else "" |
|
logging.debug(f"Tree ignore pattern: '{ignore_pattern_str}'") |
|
|
|
# Run tree command with specified parameters |
|
cmd = ["tree", directory, "-L", str(depth)] |
|
if ignore_pattern_str: |
|
cmd.extend(["-I", ignore_pattern_str]) |
|
|
|
logging.debug(f"Running command: {' '.join(cmd)}") |
|
|
|
result = subprocess.run( |
|
cmd, |
|
capture_output=True, |
|
text=True, |
|
check=True, |
|
) |
|
|
|
# Write the output to file |
|
with open(output_file, "w") as f: |
|
f.write(result.stdout) |
|
|
|
logging.debug(f"Generated tree structure in {output_file}") |
|
return True |
|
|
|
except subprocess.CalledProcessError as e: |
|
logging.error(f"Failed to generate tree: {str(e)}") |
|
logging.error(f"Command output: {e.stdout}") |
|
logging.error(f"Command error: {e.stderr}") |
|
return False |
|
except Exception as e: |
|
logging.error(f"Error while generating tree: {str(e)}") |
|
return False |
|
|
|
def should_skip_file(file_path, skip_patterns): |
|
if not skip_patterns: |
|
return False |
|
result = any(re.match(pattern, str(file_path)) for pattern in skip_patterns) |
|
if result: |
|
logging.debug(f"Skipping file: {file_path} (matched skip pattern)") |
|
return result |
|
|
|
|
|
def copy_files(source_dir, target_dir, file_patterns, skip_patterns): |
|
target_path = Path(target_dir) |
|
target_path.mkdir(parents=True, exist_ok=True) |
|
|
|
source_path = Path(source_dir) |
|
if not source_path.exists(): |
|
logging.error(f"Source directory does not exist: {source_dir}") |
|
return [] |
|
|
|
logging.debug(f"Source directory: {source_path} (exists: {source_path.exists()})") |
|
logging.debug(f"Target directory: {target_path}") |
|
logging.debug(f"File patterns: {file_patterns}") |
|
logging.debug(f"Skip patterns: {skip_patterns}") |
|
|
|
if not file_patterns: |
|
logging.warning("No file patterns specified. Nothing will be copied 🖐️") |
|
return [] |
|
|
|
copied_files = [] |
|
all_matching_files = [] |
|
|
|
# First, collect all matching files across all patterns |
|
for pattern in file_patterns: |
|
logging.debug(f"Processing pattern: {pattern}") |
|
try: |
|
matching_files = list(source_path.rglob(pattern)) |
|
all_matching_files.extend(matching_files) |
|
logging.debug(f"Found {len(matching_files)} files matching pattern: {pattern}") |
|
|
|
# Log some example matches to help with debugging |
|
if matching_files: |
|
examples = matching_files[:5] # Show up to 5 examples |
|
logging.debug(f"Example matches for '{pattern}':") |
|
for example in examples: |
|
logging.debug(f" - {example}") |
|
else: |
|
logging.warning(f"No files found matching pattern: '{pattern}' 🖐️") |
|
except Exception as e: |
|
logging.error(f"Error while searching for pattern '{pattern}': {e}") |
|
|
|
if not all_matching_files: |
|
logging.warning("No files matched any of the patterns. Nothing to copy.") |
|
return [] |
|
|
|
logging.debug( |
|
f"Total files matched across all patterns: {len(all_matching_files)} (Including files to skip)" |
|
) |
|
|
|
# Now process each file |
|
for file_path in all_matching_files: |
|
# Skip if file is in target directory |
|
if str(target_path) in str(file_path): |
|
logging.debug(f"Skipping file in target directory: {file_path}") |
|
continue |
|
|
|
# Verify the file exists and is a file (not a directory) |
|
if not file_path.exists(): |
|
logging.warning(f"File no longer exists: {file_path} 🖐️") |
|
continue |
|
|
|
if not file_path.is_file(): |
|
logging.debug(f"Skipping non-file: {file_path}") |
|
continue |
|
|
|
# Check if file should be skipped based on skip patterns |
|
if should_skip_file(file_path, skip_patterns): |
|
continue |
|
|
|
# Handle target file naming |
|
target_file = target_path / file_path.name |
|
|
|
if target_file.exists(): |
|
base = target_file.stem |
|
suffix = target_file.suffix |
|
counter = 1 |
|
while target_file.exists(): |
|
target_file = target_path / f"{base}_{counter}{suffix}" |
|
counter += 1 |
|
|
|
# Perform the actual copy |
|
try: |
|
shutil.copy2(file_path, target_file) |
|
copied_files.append((file_path, target_file)) |
|
logging.debug(f"Copied: {file_path} -> {target_file}") |
|
except PermissionError: |
|
logging.error(f"Permission denied when copying: {file_path}") |
|
except FileNotFoundError: |
|
logging.error(f"File not found when copying: {file_path}") |
|
except Exception as e: |
|
logging.error(f"Error copying {file_path}: {str(e)}") |
|
|
|
if not copied_files: |
|
logging.warning("No files were copied. Check your include/exclude patterns.") |
|
|
|
return copied_files |
|
|
|
|
|
def clean_directory(directory): |
|
dir_path = Path(directory) |
|
if dir_path.exists(): |
|
shutil.rmtree(dir_path) |
|
logging.info(f"Cleaned directory: {directory}") |
|
|
|
|
|
def main(): |
|
setup_logging() |
|
|
|
try: |
|
logging.info("Starting file copy operation...") |
|
|
|
# Check if a config file path is provided as a command-line argument |
|
config_file = "config.yaml" |
|
if len(sys.argv) > 1: |
|
config_file = sys.argv[1] |
|
|
|
# Verify config file exists before attempting to load it |
|
config_path = Path(config_file) |
|
if not config_path.exists(): |
|
logging.error(f"Configuration file not found: {config_file}") |
|
logging.error(f"Current working directory: {Path.cwd()}") |
|
logging.error("Please create a config.yaml file or specify a valid path") |
|
sys.exit(1) |
|
|
|
logging.debug(f"Using configuration file: {config_file} (exists: {config_path.exists()})") |
|
|
|
# Load configuration from YAML file |
|
config = load_config(config_file) |
|
|
|
# Extract configurations |
|
source_dir = config.get("source_dir") |
|
target_dir = config.get("target_dir") |
|
tree_depth = config.get("tree_depth", 4) |
|
skip_patterns = config.get("skip_patterns", []) |
|
include_patterns = config.get("include_patterns", []) |
|
|
|
logging.debug("Loaded configuration:") |
|
logging.debug(f" source_dir: {source_dir}") |
|
logging.debug(f" target_dir: {target_dir}") |
|
logging.debug(f" tree_depth: {tree_depth}") |
|
logging.debug(f" skip_patterns: {skip_patterns}") |
|
logging.debug(f" include_patterns: {include_patterns}") |
|
|
|
# Validate required configuration |
|
if not source_dir or not target_dir: |
|
error_msg = "Source and target directories must be specified in the configuration file" |
|
logging.error(error_msg) |
|
raise ValueError(error_msg) |
|
|
|
if not include_patterns: |
|
logging.warning("No include patterns specified. No files will be copied. 🖐️") |
|
|
|
generate_tree(source_dir, "tree.txt", tree_depth, skip_patterns) |
|
clean_directory(target_dir) # Clean before copying |
|
copied = copy_files(source_dir, target_dir, include_patterns, skip_patterns) |
|
logging.info("Copy operation completed successfully ✅") |
|
logging.info(f"Total files copied: {len(copied)}") |
|
|
|
except Exception as e: |
|
logging.error(f"An error occurred: {str(e)}") |
|
import traceback |
|
|
|
logging.error(traceback.format_exc()) |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |