Skip to content

Instantly share code, notes, and snippets.

@nomicode
Last active February 25, 2025 16:04
Show Gist options
  • Save nomicode/28cdd93d3f5c7d6aa83335b1fd0ca838 to your computer and use it in GitHub Desktop.
Save nomicode/28cdd93d3f5c7d6aa83335b1fd0ca838 to your computer and use it in GitHub Desktop.
A demo of a lightweight, high-performance security scanner that recursively searches directories and files for potentially malicious content using pattern matching.
[*]
charset = utf-8
end_of_line = lf
indent_size = 2
indent_style = space
insert_final_newline = true
max_line_length = 79
trim_trailing_whitespace = true
[*.py]
indent_size = 4
default: true
# Line length
# https://github.com/DavidAnson/markdownlint/blob/main/doc/Rules.md#md013
MD013: false
proseWrap: preserve
tabWidth: 2

File System Scanner

A demo of a lightweight, high-performance security scanner that recursively searches directories and files for potentially malicious content using pattern matching.

This gist contains the source for antivirus.py, a simple but effective file scanning utility.

Since the gist can be cloned as a git repository, I've also included the following files as a minimal setup to make editing easy and to ensure consistent style:

  • .markdownlint.yaml
  • .prettierrc.yaml
  • pyproject.toml
  • .editorconfig

Features

  • Streaming file checks: Processes files as they're discovered rather than collecting all paths first, delivering faster initial results
  • Parallel processing: Scans multiple files simultaneously for BLAZINGLY fast performance (disable with --quiet)
  • Live progress tracking: Displays a progress bar showing files checked, current file, and elapsed time (disable with --quiet)
  • Flexible output: Matches are printed to stderr by default (redirect with --output FILE)

Installation

No installation required! The script uses pip-run to automatically handle dependencies, making it perfect for quick demos and one-off scans.

# Clone the gist
git clone https://gist.github.com/yourusername/gistid.git antivirus
cd antivirus

# Run directly
./antivirus.py [OPTIONS]

Usage

./antivirus.py [OPTIONS]

Options

  • -C, --change-dir DIR: Directory to start scanning from (default: current directory)
  • -s, --string STRING: Suspicious string to search for (can specify multiple times)
    • Default: "malware", "virus"
  • -q, --quiet: Suppress progress bar and information output
  • -o, --output FILE: Write matches to specified file instead of stderr
  • -w, --workers N: Set number of parallel workers (default: 8)
  • -m, --max-size N: Maximum file size to scan in MB (default: 50)

Examples

Scan current directory with default settings:

./antivirus.py

Scan a specific directory for custom patterns:

./antivirus.py -C /path/to/scan -s backdoor -s exploit

Save results to a file:

./antivirus.py -o results.txt

Summary Statistics

When scanning completes, the script outputs:

  • Total scan time
  • Number of files checked
  • Number of suspicious files found

Screenshots

Scan in Progress

A screenshot of the scan in progress

The scanner actively processing a large repository. The progress bar displays real-time metrics showing that 492,023 files have been checked in just 11 seconds. The current operation shows the scanner traversing a .git directory, demonstrating its ability to handle complex nested directory structures efficiently.

Scan Complete

A screenshot of a completed scan

The final results screen after completing a repository scan. In less than one second, the utility processed 81 files and identified 7 suspicious files matching the defined patterns. Each suspicious match is logged to stderr with the specific matching string that triggered the detection, providing clear, actionable information for further investigation.

#!/usr/bin/env pip-run
"""
File System Scanner
A demo of a lightweight, high-performance security scanner that recursively
searches directories and files for potentially malicious content using pattern
matching.
"""
# Using `pip-run` allows running a Python script with its dependencies
# automatically installed
# Without having to create a virtual environment or manually install packages.
# When this script runs with pip-run, it reads the `__requires__` list below,
# installs the dependencies temporarily if needed, and then executes the
# script.
# This makes the script more portable since users don't need to manually
# install dependencies.
__requires__ = ["click"]
import pathlib
import os
import concurrent.futures
import mimetypes
import time
import sys
import multiprocessing
import threading
import click
# Default configuration values - will be overridden by command line options
DEFAULT_SUSPICIOUS_STRINGS = ["malware", "virus"]
DEFAULT_MAX_WORKERS = 8
DEFAULT_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
def get_terminal_width():
"""
Get current terminal width or a sensible default.
Returns:
int: Width of the terminal in characters
"""
try:
return os.get_terminal_size().columns
except (OSError, AttributeError):
return 80
def format_status_line(files_checked, current_file, elapsed_time):
"""
Format a status line with file counter, filename, and elapsed time.
Args:
files_checked: Number of files checked so far
current_file: Path of the file currently being processed
elapsed_time: Seconds elapsed since scan started
Returns:
str: Formatted status line that fills terminal width
"""
# Format the counter and timer
hours, remainder = divmod(int(elapsed_time), 3600)
minutes, seconds = divmod(remainder, 60)
counter = f"[{files_checked}]"
timer = f"[{hours:02}:{minutes:02}:{seconds:02}]"
# Calculate available space for the filename
terminal_width = get_terminal_width()
available_space = terminal_width - len(counter) - len(timer) - 2
# Truncate or pad the filename as needed
if len(current_file) > available_space:
# Truncate with ellipsis
half_space = (available_space - 3) // 2
if half_space > 0:
# Show start and end of path
truncated = (
f"{current_file[:half_space]}...{current_file[-half_space:]}"
)
else:
# If very narrow, just show the end
truncated = f"...{current_file[-(available_space - 3):]}"
display_file = truncated
else:
# Pad with spaces to fill width
display_file = current_file.ljust(available_space)
return f"{counter} {display_file} {timer}"
def update_display_thread(
stop_event, display_lock, files_checked, current_file, start_time
):
"""
Thread function to continuously update the progress display.
Args:
stop_event: Threading event to signal when to stop the thread
display_lock: Lock for synchronizing display output
files_checked: Reference to the files_checked counter
current_file: Reference to the current_file string
start_time: Time when the scan started
"""
while not stop_event.is_set():
with display_lock:
elapsed = time.time() - start_time
status_line = format_status_line(
files_checked.value, current_file.value.decode(), elapsed
)
# Clear the line and print the status
sys.stdout.write("\r" + " " * get_terminal_width())
sys.stdout.write("\r" + status_line)
sys.stdout.flush()
time.sleep(0.5)
def is_binary_file(file_path):
"""
Determine if a file is likely binary based on its mimetype.
Args:
file_path: Path to the file to check
Returns:
bool: True if the file is likely binary, False otherwise
"""
mime_type, _ = mimetypes.guess_type(str(file_path))
return bool(
mime_type
and not mime_type.startswith(
("text/", "application/json", "application/xml")
)
)
def check_file(
file_path, suspicious_strings, quiet, output_file, display_lock
):
"""
Check if a file contains any suspicious strings.
Args:
file_path: Path to the file to check
suspicious_strings: List of strings to search for
quiet: Whether to suppress output
output_file: File object to write matching files to
display_lock: Lock for synchronizing display output
Returns:
tuple: (file_path, list of found suspicious strings) if matches found,
None otherwise
"""
# Skip files that are too large
try:
if file_path.stat().st_size > DEFAULT_MAX_FILE_SIZE:
return None
# Skip binary files
if is_binary_file(file_path):
return None
# Check if file is readable
with open(file_path, "r", errors="ignore") as f:
content = f.read().lower()
if found_strings := [
s for s in suspicious_strings if s.lower() in content
]:
result = (str(file_path), found_strings)
# Print matching files as they're found if not quiet
if not quiet:
with display_lock:
# Clear the current status line first
sys.stdout.write(
"\r" + " " * get_terminal_width() + "\r"
)
sys.stdout.flush()
# Print the match to the output file
output_line = f"MATCH: {str(file_path)}: {', '.join(found_strings)}\n"
output_file.write(output_line)
output_file.flush()
return result
except (PermissionError, IsADirectoryError, FileNotFoundError) as e:
# Skip files we can't access, but log the error
with display_lock:
sys.stdout.write("\r" + " " * get_terminal_width() + "\r")
sys.stdout.flush()
print(f"Error accessing file: {file_path}: {e}", file=sys.stderr)
return None
except Exception as e:
# Catch and log any other exceptions
with display_lock:
sys.stdout.write("\r" + " " * get_terminal_width() + "\r")
sys.stdout.flush()
print(
f"Unexpected error checking file: {file_path}: {e}",
file=sys.stderr,
)
return None
return None
def file_discovery_thread(
start_dir, queue_lock, futures, executor, process_file, scan_complete
):
"""
Discover files and submit them for processing.
This thread function walks the file system starting from `start_dir`
and submits each discovered file to the thread pool for processing.
Args:
start_dir: Directory to start searching from.
queue_lock: Lock for synchronizing access to the futures set.
futures: Set to store the submitted futures.
executor: ThreadPoolExecutor for submitting tasks.
process_file: Function to process each file.
scan_complete: Event to signal when scanning is complete.
"""
try:
for root, _, _ in os.walk(start_dir):
try:
path = pathlib.Path(root)
for item in path.iterdir():
if item.is_file():
# Submit file for scanning as soon as it's found
with queue_lock:
future = executor.submit(process_file, item)
futures.add(future)
except (PermissionError, FileNotFoundError):
continue
finally:
# Signal that we've finished finding files
scan_complete.set()
def find_suspicious_files(
start_dir, suspicious_strings, quiet, output_file, max_workers
):
"""
Recursively search the filesystem for files containing suspicious strings,
processing files as they are discovered.
Args:
start_dir: Directory to start searching from
suspicious_strings: List of strings to search for
quiet: Whether to suppress progress output
output_file: File object to write matching files to
max_workers: Maximum number of concurrent workers
Returns:
tuple: (suspicious_files, files_checked, elapsed_time)
"""
suspicious_files = []
files_checked = multiprocessing.Value("i", 0)
current_file = multiprocessing.Array("c", 256) # Adjust size as needed
current_file.value = b""
start_time = time.time()
# Create a lock for synchronizing display output
display_lock = threading.Lock()
if not quiet:
print(f"Starting scan from {start_dir}...")
# Thread synchronization objects
queue_lock = threading.Lock()
scan_complete = threading.Event()
stop_event = threading.Event()
# Start the display update thread if not in quiet mode
if not quiet:
display_thread = threading.Thread(
target=update_display_thread,
args=(
stop_event,
display_lock,
files_checked,
current_file,
start_time,
),
)
display_thread.daemon = True
display_thread.start()
# Set up the thread pool for scanning
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = set()
# Define the file processing function
def process_file(file_path):
# Update progress tracking
with files_checked.get_lock():
files_checked.value += 1
encoded_path = str(file_path).encode()
current_file.value = encoded_path[
: len(current_file)
] # Truncate if too long
return check_file(
file_path, suspicious_strings, quiet, output_file, display_lock
)
# Start file discovery in a separate thread
discovery_thread = threading.Thread(
target=file_discovery_thread,
args=(
start_dir,
queue_lock,
futures,
executor,
process_file,
scan_complete,
),
)
discovery_thread.daemon = True
discovery_thread.start()
# Process results as they complete
completed_futures = set()
while not (
scan_complete.is_set() and len(completed_futures) == len(futures)
):
# Get newly completed futures
just_completed = set()
with queue_lock:
for future in futures:
if future.done() and future not in completed_futures:
just_completed.add(future)
for future in just_completed:
completed_futures.add(future)
# Process results from newly completed futures
for future in just_completed:
result = future.result()
if result:
suspicious_files.append(result)
# Avoid spinning too fast
time.sleep(0.1)
# Clean up
if not quiet:
stop_event.set()
if "display_thread" in locals():
display_thread.join()
# Final clear of the status line
with display_lock:
sys.stdout.write("\r" + " " * get_terminal_width() + "\r")
sys.stdout.flush()
# Add a blank line before the summary
print()
return suspicious_files, files_checked.value, time.time() - start_time
@click.command()
@click.option(
"-C",
"--change-dir",
help="Directory to start scanning from (default: current working directory)",
type=click.Path(
exists=True, file_okay=False, dir_okay=True, readable=True
),
default=os.getcwd(),
)
@click.option(
"-q", "--quiet", help="Suppress progress information output", is_flag=True
)
@click.option(
"-s",
"--string",
help="Suspicious string to search for (can be specified multiple times)",
multiple=True,
)
@click.option(
"-o",
"--output",
help="Output file for matching results (default: <stderr>)",
type=click.File("w"),
default=sys.stderr,
)
@click.option(
"-w",
"--workers",
help=f"Maximum number of concurrent workers (default: {DEFAULT_MAX_WORKERS})",
type=int,
default=DEFAULT_MAX_WORKERS,
)
@click.option(
"-m",
"--max-size",
help="Maximum file size to scan in MB (default: 50)",
type=int,
default=50,
)
def main(change_dir, quiet, string, output, workers, max_size):
"""
Recursively search the filesystem for files containing specified suspicious strings.
By default, searches for 'malware' and 'virus' strings starting from the current directory.
Matching files are output to <stderr> as they are found unless redirected
with -o/--output.
"""
# Set max file size from command line option
global DEFAULT_MAX_FILE_SIZE
DEFAULT_MAX_FILE_SIZE = max_size * 1024 * 1024
# Use provided suspicious strings or fallback to defaults
suspicious_strings = list(string) if string else DEFAULT_SUSPICIOUS_STRINGS
if not quiet:
print(f"Starting scan from: {change_dir}")
print(f"Looking for strings: {', '.join(suspicious_strings)}")
print(f"Outputting matches to: {output.name}")
print()
suspicious_files, files_checked, elapsed_time = find_suspicious_files(
change_dir, suspicious_strings, quiet, output, workers
)
# Display summary results
if not quiet:
hours, remainder = divmod(int(elapsed_time), 3600)
minutes, seconds = divmod(remainder, 60)
print(f"\nScan completed in {hours:02}:{minutes:02}:{seconds:02}")
print(f"Checked {files_checked} files")
print(f"Found {len(suspicious_files)} suspicious files")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\nScan interrupted by user. Exiting gracefully...")
# Reset terminal if needed
print("\033[?25h") # Ensure cursor is visible
sys.exit(0)
[tool.black]
line-length = 79
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment