|
#!/usr/bin/env pip-run |
|
|
|
""" |
|
File System Scanner |
|
|
|
A demo of a lightweight, high-performance security scanner that recursively |
|
searches directories and files for potentially malicious content using pattern |
|
matching. |
|
""" |
|
|
|
# Using `pip-run` allows running a Python script with its dependencies |
|
# automatically installed |
|
|
|
# Without having to create a virtual environment or manually install packages. |
|
# When this script runs with pip-run, it reads the `__requires__` list below, |
|
# installs the dependencies temporarily if needed, and then executes the |
|
# script. |
|
|
|
# This makes the script more portable since users don't need to manually |
|
# install dependencies. |
|
|
|
__requires__ = ["click"] |
|
|
|
import pathlib |
|
import os |
|
import concurrent.futures |
|
import mimetypes |
|
import time |
|
import sys |
|
import multiprocessing |
|
import threading |
|
import click |
|
|
|
# Default configuration values - will be overridden by command line options |
|
DEFAULT_SUSPICIOUS_STRINGS = ["malware", "virus"] |
|
DEFAULT_MAX_WORKERS = 8 |
|
DEFAULT_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB |
|
|
|
|
|
def get_terminal_width(): |
|
""" |
|
Get current terminal width or a sensible default. |
|
|
|
Returns: |
|
int: Width of the terminal in characters |
|
""" |
|
try: |
|
return os.get_terminal_size().columns |
|
except (OSError, AttributeError): |
|
return 80 |
|
|
|
|
|
def format_status_line(files_checked, current_file, elapsed_time): |
|
""" |
|
Format a status line with file counter, filename, and elapsed time. |
|
|
|
Args: |
|
files_checked: Number of files checked so far |
|
current_file: Path of the file currently being processed |
|
elapsed_time: Seconds elapsed since scan started |
|
|
|
Returns: |
|
str: Formatted status line that fills terminal width |
|
""" |
|
# Format the counter and timer |
|
hours, remainder = divmod(int(elapsed_time), 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
|
|
counter = f"[{files_checked}]" |
|
timer = f"[{hours:02}:{minutes:02}:{seconds:02}]" |
|
|
|
# Calculate available space for the filename |
|
terminal_width = get_terminal_width() |
|
available_space = terminal_width - len(counter) - len(timer) - 2 |
|
|
|
# Truncate or pad the filename as needed |
|
if len(current_file) > available_space: |
|
# Truncate with ellipsis |
|
half_space = (available_space - 3) // 2 |
|
if half_space > 0: |
|
# Show start and end of path |
|
truncated = ( |
|
f"{current_file[:half_space]}...{current_file[-half_space:]}" |
|
) |
|
else: |
|
# If very narrow, just show the end |
|
truncated = f"...{current_file[-(available_space - 3):]}" |
|
display_file = truncated |
|
else: |
|
# Pad with spaces to fill width |
|
display_file = current_file.ljust(available_space) |
|
|
|
return f"{counter} {display_file} {timer}" |
|
|
|
|
|
def update_display_thread( |
|
stop_event, display_lock, files_checked, current_file, start_time |
|
): |
|
""" |
|
Thread function to continuously update the progress display. |
|
|
|
Args: |
|
stop_event: Threading event to signal when to stop the thread |
|
display_lock: Lock for synchronizing display output |
|
files_checked: Reference to the files_checked counter |
|
current_file: Reference to the current_file string |
|
start_time: Time when the scan started |
|
""" |
|
while not stop_event.is_set(): |
|
with display_lock: |
|
elapsed = time.time() - start_time |
|
status_line = format_status_line( |
|
files_checked.value, current_file.value.decode(), elapsed |
|
) |
|
|
|
# Clear the line and print the status |
|
sys.stdout.write("\r" + " " * get_terminal_width()) |
|
sys.stdout.write("\r" + status_line) |
|
sys.stdout.flush() |
|
time.sleep(0.5) |
|
|
|
|
|
def is_binary_file(file_path): |
|
""" |
|
Determine if a file is likely binary based on its mimetype. |
|
|
|
Args: |
|
file_path: Path to the file to check |
|
|
|
Returns: |
|
bool: True if the file is likely binary, False otherwise |
|
""" |
|
mime_type, _ = mimetypes.guess_type(str(file_path)) |
|
return bool( |
|
mime_type |
|
and not mime_type.startswith( |
|
("text/", "application/json", "application/xml") |
|
) |
|
) |
|
|
|
|
|
def check_file( |
|
file_path, suspicious_strings, quiet, output_file, display_lock |
|
): |
|
""" |
|
Check if a file contains any suspicious strings. |
|
|
|
Args: |
|
file_path: Path to the file to check |
|
suspicious_strings: List of strings to search for |
|
quiet: Whether to suppress output |
|
output_file: File object to write matching files to |
|
display_lock: Lock for synchronizing display output |
|
|
|
Returns: |
|
tuple: (file_path, list of found suspicious strings) if matches found, |
|
None otherwise |
|
""" |
|
# Skip files that are too large |
|
try: |
|
if file_path.stat().st_size > DEFAULT_MAX_FILE_SIZE: |
|
return None |
|
|
|
# Skip binary files |
|
if is_binary_file(file_path): |
|
return None |
|
|
|
# Check if file is readable |
|
with open(file_path, "r", errors="ignore") as f: |
|
content = f.read().lower() |
|
|
|
if found_strings := [ |
|
s for s in suspicious_strings if s.lower() in content |
|
]: |
|
result = (str(file_path), found_strings) |
|
|
|
# Print matching files as they're found if not quiet |
|
if not quiet: |
|
with display_lock: |
|
# Clear the current status line first |
|
sys.stdout.write( |
|
"\r" + " " * get_terminal_width() + "\r" |
|
) |
|
sys.stdout.flush() |
|
|
|
# Print the match to the output file |
|
output_line = f"MATCH: {str(file_path)}: {', '.join(found_strings)}\n" |
|
output_file.write(output_line) |
|
output_file.flush() |
|
|
|
return result |
|
|
|
except (PermissionError, IsADirectoryError, FileNotFoundError) as e: |
|
# Skip files we can't access, but log the error |
|
with display_lock: |
|
sys.stdout.write("\r" + " " * get_terminal_width() + "\r") |
|
sys.stdout.flush() |
|
print(f"Error accessing file: {file_path}: {e}", file=sys.stderr) |
|
return None |
|
except Exception as e: |
|
# Catch and log any other exceptions |
|
with display_lock: |
|
sys.stdout.write("\r" + " " * get_terminal_width() + "\r") |
|
sys.stdout.flush() |
|
print( |
|
f"Unexpected error checking file: {file_path}: {e}", |
|
file=sys.stderr, |
|
) |
|
return None |
|
|
|
return None |
|
|
|
|
|
def file_discovery_thread( |
|
start_dir, queue_lock, futures, executor, process_file, scan_complete |
|
): |
|
""" |
|
Discover files and submit them for processing. |
|
|
|
This thread function walks the file system starting from `start_dir` |
|
and submits each discovered file to the thread pool for processing. |
|
|
|
Args: |
|
start_dir: Directory to start searching from. |
|
queue_lock: Lock for synchronizing access to the futures set. |
|
futures: Set to store the submitted futures. |
|
executor: ThreadPoolExecutor for submitting tasks. |
|
process_file: Function to process each file. |
|
scan_complete: Event to signal when scanning is complete. |
|
""" |
|
try: |
|
for root, _, _ in os.walk(start_dir): |
|
try: |
|
path = pathlib.Path(root) |
|
for item in path.iterdir(): |
|
if item.is_file(): |
|
# Submit file for scanning as soon as it's found |
|
with queue_lock: |
|
future = executor.submit(process_file, item) |
|
futures.add(future) |
|
except (PermissionError, FileNotFoundError): |
|
continue |
|
finally: |
|
# Signal that we've finished finding files |
|
scan_complete.set() |
|
|
|
|
|
def find_suspicious_files( |
|
start_dir, suspicious_strings, quiet, output_file, max_workers |
|
): |
|
""" |
|
Recursively search the filesystem for files containing suspicious strings, |
|
processing files as they are discovered. |
|
|
|
Args: |
|
start_dir: Directory to start searching from |
|
suspicious_strings: List of strings to search for |
|
quiet: Whether to suppress progress output |
|
output_file: File object to write matching files to |
|
max_workers: Maximum number of concurrent workers |
|
|
|
Returns: |
|
tuple: (suspicious_files, files_checked, elapsed_time) |
|
""" |
|
suspicious_files = [] |
|
files_checked = multiprocessing.Value("i", 0) |
|
current_file = multiprocessing.Array("c", 256) # Adjust size as needed |
|
current_file.value = b"" |
|
start_time = time.time() |
|
|
|
# Create a lock for synchronizing display output |
|
display_lock = threading.Lock() |
|
|
|
if not quiet: |
|
print(f"Starting scan from {start_dir}...") |
|
|
|
# Thread synchronization objects |
|
queue_lock = threading.Lock() |
|
scan_complete = threading.Event() |
|
stop_event = threading.Event() |
|
|
|
# Start the display update thread if not in quiet mode |
|
if not quiet: |
|
display_thread = threading.Thread( |
|
target=update_display_thread, |
|
args=( |
|
stop_event, |
|
display_lock, |
|
files_checked, |
|
current_file, |
|
start_time, |
|
), |
|
) |
|
display_thread.daemon = True |
|
display_thread.start() |
|
|
|
# Set up the thread pool for scanning |
|
with concurrent.futures.ThreadPoolExecutor( |
|
max_workers=max_workers |
|
) as executor: |
|
futures = set() |
|
|
|
# Define the file processing function |
|
def process_file(file_path): |
|
# Update progress tracking |
|
with files_checked.get_lock(): |
|
files_checked.value += 1 |
|
encoded_path = str(file_path).encode() |
|
current_file.value = encoded_path[ |
|
: len(current_file) |
|
] # Truncate if too long |
|
|
|
return check_file( |
|
file_path, suspicious_strings, quiet, output_file, display_lock |
|
) |
|
|
|
# Start file discovery in a separate thread |
|
discovery_thread = threading.Thread( |
|
target=file_discovery_thread, |
|
args=( |
|
start_dir, |
|
queue_lock, |
|
futures, |
|
executor, |
|
process_file, |
|
scan_complete, |
|
), |
|
) |
|
discovery_thread.daemon = True |
|
discovery_thread.start() |
|
|
|
# Process results as they complete |
|
completed_futures = set() |
|
while not ( |
|
scan_complete.is_set() and len(completed_futures) == len(futures) |
|
): |
|
# Get newly completed futures |
|
just_completed = set() |
|
with queue_lock: |
|
for future in futures: |
|
if future.done() and future not in completed_futures: |
|
just_completed.add(future) |
|
|
|
for future in just_completed: |
|
completed_futures.add(future) |
|
|
|
# Process results from newly completed futures |
|
for future in just_completed: |
|
result = future.result() |
|
if result: |
|
suspicious_files.append(result) |
|
|
|
# Avoid spinning too fast |
|
time.sleep(0.1) |
|
|
|
# Clean up |
|
if not quiet: |
|
stop_event.set() |
|
if "display_thread" in locals(): |
|
display_thread.join() |
|
|
|
# Final clear of the status line |
|
with display_lock: |
|
sys.stdout.write("\r" + " " * get_terminal_width() + "\r") |
|
sys.stdout.flush() |
|
|
|
# Add a blank line before the summary |
|
print() |
|
|
|
return suspicious_files, files_checked.value, time.time() - start_time |
|
|
|
|
|
@click.command() |
|
@click.option( |
|
"-C", |
|
"--change-dir", |
|
help="Directory to start scanning from (default: current working directory)", |
|
type=click.Path( |
|
exists=True, file_okay=False, dir_okay=True, readable=True |
|
), |
|
default=os.getcwd(), |
|
) |
|
@click.option( |
|
"-q", "--quiet", help="Suppress progress information output", is_flag=True |
|
) |
|
@click.option( |
|
"-s", |
|
"--string", |
|
help="Suspicious string to search for (can be specified multiple times)", |
|
multiple=True, |
|
) |
|
@click.option( |
|
"-o", |
|
"--output", |
|
help="Output file for matching results (default: <stderr>)", |
|
type=click.File("w"), |
|
default=sys.stderr, |
|
) |
|
@click.option( |
|
"-w", |
|
"--workers", |
|
help=f"Maximum number of concurrent workers (default: {DEFAULT_MAX_WORKERS})", |
|
type=int, |
|
default=DEFAULT_MAX_WORKERS, |
|
) |
|
@click.option( |
|
"-m", |
|
"--max-size", |
|
help="Maximum file size to scan in MB (default: 50)", |
|
type=int, |
|
default=50, |
|
) |
|
def main(change_dir, quiet, string, output, workers, max_size): |
|
""" |
|
Recursively search the filesystem for files containing specified suspicious strings. |
|
|
|
By default, searches for 'malware' and 'virus' strings starting from the current directory. |
|
|
|
Matching files are output to <stderr> as they are found unless redirected |
|
with -o/--output. |
|
""" |
|
# Set max file size from command line option |
|
global DEFAULT_MAX_FILE_SIZE |
|
DEFAULT_MAX_FILE_SIZE = max_size * 1024 * 1024 |
|
|
|
# Use provided suspicious strings or fallback to defaults |
|
suspicious_strings = list(string) if string else DEFAULT_SUSPICIOUS_STRINGS |
|
|
|
if not quiet: |
|
print(f"Starting scan from: {change_dir}") |
|
print(f"Looking for strings: {', '.join(suspicious_strings)}") |
|
print(f"Outputting matches to: {output.name}") |
|
print() |
|
|
|
suspicious_files, files_checked, elapsed_time = find_suspicious_files( |
|
change_dir, suspicious_strings, quiet, output, workers |
|
) |
|
|
|
# Display summary results |
|
if not quiet: |
|
hours, remainder = divmod(int(elapsed_time), 3600) |
|
minutes, seconds = divmod(remainder, 60) |
|
print(f"\nScan completed in {hours:02}:{minutes:02}:{seconds:02}") |
|
print(f"Checked {files_checked} files") |
|
print(f"Found {len(suspicious_files)} suspicious files") |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
main() |
|
except KeyboardInterrupt: |
|
print("\n\nScan interrupted by user. Exiting gracefully...") |
|
# Reset terminal if needed |
|
print("\033[?25h") # Ensure cursor is visible |
|
sys.exit(0) |