Last active
June 28, 2025 19:18
-
-
Save RohanArora13/28b2c2f568d9363bff78393bb4b3f0b5 to your computer and use it in GitHub Desktop.
This Python script recursively scans for .py files, identifies duplicate log codes, replaces them with unique ones, and inserts missing 4-digit codes into log statements. It respects .gitignore, tracks used codes, and provides a progress bar.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import glob | |
import os | |
import random | |
import re | |
import sys | |
import time | |
import fnmatch | |
from typing import Dict, List, Set, Tuple | |
""" | |
Overview | |
log_code_editor.py is an automated code maintenance tool designed to manage and standardize 4-digit log codes across all Python files in a project. It ensures that every log statement has a unique identifier for easier debugging and log tracking. | |
Primary Functions | |
1. Log Code Collection & Tracking | |
Scans all Python files recursively in the workspace | |
Extracts existing 4-digit log codes from log statements using regex patterns | |
Maintains a persistent database of used codes in used_log_codes.txt | |
Respects .gitignore patterns to exclude unnecessary files | |
2. Duplicate Code Resolution | |
Identifies duplicate 4-digit codes within the same file using find_duplicate_log_codes() | |
Replaces duplicate occurrences with unique random 4-digit codes (1000-9999) | |
Preserves the first occurrence of each duplicate code | |
3. Missing Code Addition | |
Detects log statements without 4-digit codes using add_missing_codes() | |
Automatically generates and inserts unique 4-digit codes at the beginning of log messages | |
Supports all log levels: debug, info, error, warning, critical | |
""" | |
# Define the codes file path once as a global variable | |
# Using os.path.join for cross-platform path handling | |
BASE_PATH = "." | |
LOG_CODES_DIR = "z_dev_files" | |
LOG_CODES_FILE = "used_log_codes.txt" | |
CODES_FILE = os.path.join(BASE_PATH, LOG_CODES_DIR, LOG_CODES_FILE) | |
# Pre-compile regex patterns for better performance | |
LOG_PATTERN = re.compile( | |
r'log\.(debug|info|error|warning|critical)\s*\(\s*(?:f?")(\d{4})' | |
) | |
LOG_ALL_PATTERN = re.compile( | |
r'log\.(debug|info|error|warning|critical)\s*\(\s*(?:f?")([^"]*)' | |
) | |
""" | |
""" | |
def parse_gitignore(base_path: str = ".") -> List[str]: | |
"""Parse .gitignore file and return list of patterns to exclude""" | |
gitignore_path = os.path.join(base_path, ".gitignore") | |
patterns = [] | |
if os.path.exists(gitignore_path): | |
try: | |
with open(gitignore_path, "r", encoding="utf-8") as f: | |
for line in f: | |
line = line.strip() | |
# Skip empty lines and comments | |
if line and not line.startswith("#"): | |
patterns.append(line) | |
except Exception as e: | |
print(f"β οΈ Warning: Could not read .gitignore: {e}") | |
# Add some default patterns for Python projects | |
default_patterns = ["*.pyc", "__pycache__/", "*.pyo", "*.pyd", ".Python"] | |
patterns.extend(default_patterns) | |
return patterns | |
def should_exclude_file(file_path: str, patterns: List[str]) -> bool: | |
"""Check if a file should be excluded based on gitignore patterns""" | |
# Normalize path separators | |
normalized_path = file_path.replace("\\", "/") | |
for pattern in patterns: | |
# Handle directory patterns | |
if pattern.endswith("/"): | |
if pattern[:-1] in normalized_path.split("/"): | |
return True | |
# Handle file patterns with wildcards | |
elif "*" in pattern: | |
if fnmatch.fnmatch(os.path.basename(normalized_path), pattern): | |
return True | |
if fnmatch.fnmatch(normalized_path, pattern): | |
return True | |
# Handle exact matches | |
else: | |
if pattern in normalized_path: | |
return True | |
return False | |
def print_progress_bar( | |
iteration: int, | |
total: int, | |
prefix: str = "", | |
suffix: str = "", | |
decimals: int = 1, | |
length: int = 50, | |
fill: str = "β", | |
print_end: str = "\r", | |
): | |
"""Print a progress bar to the console""" | |
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) | |
filled_length = int(length * iteration // total) | |
bar = fill * filled_length + "-" * (length - filled_length) | |
print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=print_end) | |
if iteration == total: | |
print() | |
def find_duplicate_log_codes(file_content: str) -> Dict[str, int]: | |
"""Find duplicate 4-digit log codes in file content using pre-compiled regex""" | |
# Find all matches using pre-compiled pattern | |
matches = LOG_PATTERN.findall(file_content) | |
# Extract just the 4-digit codes | |
codes = [match[1] for match in matches] | |
# Find duplicates | |
code_counts = collections.Counter(codes) | |
duplicates = {code: count for code, count in code_counts.items() if count > 1} | |
return duplicates | |
def extract_log_codes_from_file(file_path: str) -> List[str]: | |
"""Extract all 4-digit log codes from a file using pre-compiled regex""" | |
try: | |
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
# Find all log statements with 4-digit codes using pre-compiled pattern | |
matches = LOG_PATTERN.findall(content) | |
# Extract just the 4-digit codes | |
codes = [match[1] for match in matches] | |
return codes | |
except Exception as e: | |
print(f"β Error extracting codes from {file_path}: {str(e)}") | |
return [] | |
def collect_all_used_codes(base_path: str = ".") -> Set[str]: | |
"""Collect all 4-digit log codes from all Python files in the workspace with progress tracking""" | |
print("π Collecting all used log codes...") | |
start_time = time.time() | |
all_codes = set() | |
# First check if there's an existing codes file | |
if os.path.exists(CODES_FILE): | |
with open(CODES_FILE, "r") as f: | |
for line in f: | |
code = line.strip() | |
if code and len(code) == 4 and code.isdigit(): | |
all_codes.add(code) | |
print(f"β Loaded {len(all_codes)} codes from existing file") | |
# Get all Python files first to show progress | |
all_py_files = glob.glob(os.path.join(base_path, "**", "*.py"), recursive=True) | |
# Parse gitignore patterns | |
exclude_patterns = parse_gitignore(base_path) | |
# Filter out files based on gitignore patterns | |
py_files = [] | |
for file_path in all_py_files: | |
if not should_exclude_file(file_path, exclude_patterns): | |
py_files.append(file_path) | |
total_files = len(py_files) | |
excluded_count = len(all_py_files) - len(py_files) | |
if excluded_count > 0: | |
print(f"π« Excluded {excluded_count} files based on .gitignore patterns") | |
if total_files == 0: | |
print("β οΈ No Python files found!") | |
return all_codes | |
print(f"π Found {total_files} Python files to scan") | |
# Scan all Python files with progress bar | |
for i, py_file in enumerate(py_files): | |
print_progress_bar( | |
i, | |
total_files, | |
prefix="Scanning files", | |
suffix=f"({i}/{total_files}) {os.path.basename(py_file)}", | |
) | |
codes = extract_log_codes_from_file(py_file) | |
if codes: | |
all_codes.update(codes) | |
# Final progress update | |
print_progress_bar( | |
total_files, | |
total_files, | |
prefix="Scanning files", | |
suffix=f"({total_files}/{total_files}) Complete!", | |
) | |
elapsed_time = time.time() - start_time | |
print(f"β Total unique codes collected: {len(all_codes)} (in {elapsed_time:.2f}s)") | |
# Save all codes to the file | |
os.makedirs(os.path.dirname(CODES_FILE), exist_ok=True) | |
with open(CODES_FILE, "w") as f: | |
for code in sorted(all_codes): | |
f.write(f"{code}\n") | |
print(f"πΎ All codes saved to {CODES_FILE}") | |
return all_codes | |
def load_used_codes() -> Set[str]: | |
"""Load previously used codes from a file""" | |
used_codes = set() | |
if os.path.exists(CODES_FILE): | |
with open(CODES_FILE, "r") as f: | |
for line in f: | |
code = line.strip() | |
if code and len(code) == 4 and code.isdigit(): | |
used_codes.add(code) | |
return used_codes | |
def save_used_code(code: str) -> None: | |
"""Save a newly used code to the tracking file""" | |
with open(CODES_FILE, "a") as f: | |
f.write(f"{code}\n") | |
def generate_unique_code(existing_codes: Set[str]) -> str: | |
"""Generate a random 4-digit code that doesn't exist in the given set""" | |
# Optimize by checking available codes first if we're running low | |
if len(existing_codes) > 8000: # If we're using more than 80% of available codes | |
available_codes = set(f"{i:04d}" for i in range(1000, 10000)) - existing_codes | |
if available_codes: | |
return random.choice(list(available_codes)) | |
# Standard random generation for normal cases | |
while True: | |
new_code = f"{random.randint(1000, 9999):04d}" | |
if new_code not in existing_codes: | |
return new_code | |
def replace_duplicate_log_codes( | |
file_path: str, used_codes: Set[str] | |
) -> Tuple[bool, str]: | |
""" | |
Replace duplicate log codes with unique random numbers - optimized version | |
""" | |
try: | |
# Read file content | |
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
# Find duplicates | |
duplicates = find_duplicate_log_codes(content) | |
if not duplicates: | |
return False, "No duplicate log codes found." | |
result_message = f"π Found {len(duplicates)} duplicate log codes" | |
replaced_content = content | |
codes_replaced = 0 | |
# Process duplicates more efficiently | |
for code in duplicates: | |
# Create pattern for this specific code | |
code_pattern = re.compile( | |
rf'(log\.(debug|info|error|warning|critical)\s*\(\s*(?:f?")({code}))' | |
) | |
matches = list(code_pattern.finditer(replaced_content)) | |
# Keep the first occurrence, replace others | |
# Process from end to start to maintain position accuracy | |
for match in reversed(matches[1:]): # Skip first occurrence | |
# Generate new code | |
new_code = generate_unique_code(used_codes) | |
used_codes.add(new_code) | |
# Get the start and end positions of the code in the content | |
start_pos = match.start(3) | |
end_pos = match.end(3) | |
# Replace just this occurrence | |
replaced_content = ( | |
replaced_content[:start_pos] + new_code + replaced_content[end_pos:] | |
) | |
codes_replaced += 1 | |
result_message += f"\n β {code} β {new_code}" | |
# Only write to the file if changes were made | |
if codes_replaced > 0: | |
with open(file_path, "w", encoding="utf-8") as f: | |
f.write(replaced_content) | |
# Batch save new codes to reduce I/O | |
with open(CODES_FILE, "a") as f: | |
for code in [c for c in used_codes if c not in load_used_codes()]: | |
f.write(f"{code}\n") | |
result_message += f"\nπ― Replaced {codes_replaced} duplicate codes" | |
return True, result_message | |
return False, "No replacements needed." | |
except Exception as e: | |
return False, f"β Error replacing log codes: {str(e)}" | |
def add_missing_codes(file_path: str, used_codes: Set[str]) -> Tuple[bool, str]: | |
""" | |
Add 4-digit codes to log statements that don't have them - optimized version | |
""" | |
try: | |
# Read file content | |
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
# Find all log statements using pre-compiled pattern | |
all_log_matches = list(LOG_ALL_PATTERN.finditer(content)) | |
if not all_log_matches: | |
return False, "No log statements found in the file." | |
# Collect all modifications first, then apply them in reverse order | |
modifications = [] | |
new_codes = [] | |
for match in all_log_matches: | |
log_text = match.group(2) | |
if not (len(log_text) >= 4 and log_text[:4].isdigit()): | |
# This log statement doesn't start with a 4-digit code | |
log_type = match.group(1) | |
start_pos = match.start(2) | |
end_pos = match.end(2) | |
# Generate a unique code | |
new_code = generate_unique_code(used_codes) | |
used_codes.add(new_code) | |
new_codes.append(new_code) | |
# Prepare the new log text | |
if log_text: | |
new_log_text = f"{new_code} {log_text}" | |
else: | |
new_log_text = new_code | |
modifications.append( | |
(start_pos, end_pos, new_log_text, log_type, new_code) | |
) | |
if not modifications: | |
return False, "No missing codes to add." | |
# Apply modifications in reverse order to maintain position accuracy | |
replaced_content = content | |
for start_pos, end_pos, new_log_text, log_type, new_code in reversed( | |
modifications | |
): | |
replaced_content = ( | |
replaced_content[:start_pos] + new_log_text + replaced_content[end_pos:] | |
) | |
# Write the modified content | |
with open(file_path, "w", encoding="utf-8") as f: | |
f.write(replaced_content) | |
# Batch save new codes to reduce I/O | |
if new_codes: | |
with open(CODES_FILE, "a") as f: | |
for code in new_codes: | |
f.write(f"{code}\n") | |
result_message = ( | |
f"β Added {len(modifications)} missing codes to log statements" | |
) | |
return True, result_message | |
except Exception as e: | |
return False, f"β Error adding missing codes: {str(e)}" | |
def process_all_python_files(base_path: str = ".") -> None: | |
"""Process all Python files in the workspace with enhanced progress tracking""" | |
global BASE_PATH | |
BASE_PATH = base_path | |
print("π Starting log code processing...") | |
start_time = time.time() | |
# First collect all used codes | |
used_codes = collect_all_used_codes(base_path) | |
# Get all Python files for processing | |
all_py_files = glob.glob(os.path.join(base_path, "**", "*.py"), recursive=True) | |
# Parse gitignore patterns | |
exclude_patterns = parse_gitignore(base_path) | |
# Filter out files based on gitignore patterns | |
py_files = [] | |
for file_path in all_py_files: | |
if not should_exclude_file(file_path, exclude_patterns): | |
py_files.append(file_path) | |
total_files = len(py_files) | |
excluded_count = len(all_py_files) - len(py_files) | |
if excluded_count > 0: | |
print(f"π« Excluded {excluded_count} files based on .gitignore patterns") | |
if total_files == 0: | |
print("β οΈ No Python files found to process!") | |
return | |
# Track statistics | |
files_modified = [] | |
files_with_duplicates = [] | |
files_with_missing_codes = [] | |
total_duplicates_fixed = 0 | |
total_codes_added = 0 | |
print(f"\nπ§ Processing {total_files} Python files...") | |
# Process all Python files with progress tracking | |
for i, py_file in enumerate(py_files): | |
file_name = os.path.basename(py_file) | |
print_progress_bar( | |
i, | |
total_files, | |
prefix="Processing files", | |
suffix=f"({i+1}/{total_files}) {file_name}", | |
) | |
file_modified = False | |
# First, handle duplicate codes | |
duplicates_fixed, msg = replace_duplicate_log_codes(py_file, used_codes) | |
if duplicates_fixed: | |
files_with_duplicates.append(py_file) | |
file_modified = True | |
# Count duplicates from message | |
if "Replaced" in msg and "duplicate codes" in msg: | |
try: | |
count = int(msg.split("Replaced ")[1].split(" duplicate")[0]) | |
total_duplicates_fixed += count | |
except: | |
total_duplicates_fixed += 1 | |
# Then, add missing codes | |
codes_added, msg = add_missing_codes(py_file, used_codes) | |
if codes_added: | |
files_with_missing_codes.append(py_file) | |
file_modified = True | |
# Count added codes from message | |
if "Added" in msg and "missing codes" in msg: | |
try: | |
count = int(msg.split("Added ")[1].split(" missing")[0]) | |
total_codes_added += count | |
except: | |
total_codes_added += 1 | |
if file_modified and py_file not in files_modified: | |
files_modified.append(py_file) | |
# Final progress update | |
print_progress_bar( | |
total_files, | |
total_files, | |
prefix="Processing files", | |
suffix=f"({total_files}/{total_files}) Complete!", | |
) | |
# Save the final set of used codes | |
print("\nπΎ Saving final code database...") | |
with open(CODES_FILE, "w") as f: | |
for code in sorted(used_codes): | |
f.write(f"{code}\n") | |
# Calculate processing time | |
elapsed_time = time.time() - start_time | |
# Print enhanced summary | |
print("\n" + "=" * 60) | |
print("π PROCESSING SUMMARY") | |
print("=" * 60) | |
print(f"β±οΈ Total processing time: {elapsed_time:.2f} seconds") | |
print(f"π Python files scanned: {total_files}") | |
print(f"π§ Files modified: {len(files_modified)}") | |
print(f"π Files with duplicates fixed: {len(files_with_duplicates)}") | |
print(f"β Files with missing codes added: {len(files_with_missing_codes)}") | |
print(f"π― Total duplicate codes fixed: {total_duplicates_fixed}") | |
print(f"π Total new codes added: {total_codes_added}") | |
print(f"π Final unique code count: {len(used_codes)}") | |
print(f"πΎ Code database saved to: {CODES_FILE}") | |
if files_modified: | |
print(f"\nπ Modified files:") | |
for file_path in files_modified: | |
print(f" β’ {file_path}") | |
print("\nβ Log code processing completed successfully!") | |
if __name__ == "__main__": | |
process_all_python_files() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment