Skip to content

Instantly share code, notes, and snippets.

@spinningcat
Created March 14, 2025 12:19
Show Gist options
  • Save spinningcat/3a0cb811aa82b9fc39584cbb43684a01 to your computer and use it in GitHub Desktop.
Save spinningcat/3a0cb811aa82b9fc39584cbb43684a01 to your computer and use it in GitHub Desktop.
import os
import time
import shutil
import itertools
# Define folder paths
folder_path = "/home/spinningca-t/workers/listener/new/"
moved_folder_path = "/home/spinningca-t/workers/listener/done/"
unsupported_folder_path = "/home/spinningca-t/workers/listener/unsupported/"
# Create the "new", "moved" and "unsupported" folders if they don't exist
if not os.path.exists(folder_path):
os.makedirs(folder_path)
if not os.path.exists(moved_folder_path):
os.makedirs(moved_folder_path)
if not os.path.exists(unsupported_folder_path):
os.makedirs(unsupported_folder_path)
# Supported file extensions
supported_extensions = {'.csx', '.docx', '.jpeg', '.jpg', '.webp', '.odt', '.xlsx', '.doc', '.html', '.md', '.pptx', '.txt', '.sub', '.msg', '.svg', '.iso'}
# Dictionary to track files and their sizes
tracked_files = {}
print(f"Monitoring {folder_path} for new files and size changes...")
def is_unsupported(entry):
"""Check if a file is unsupported based on its extension."""
_, extension = os.path.splitext(entry.name)
return extension.lower() not in supported_extensions
def monitor_folder():
"""Monitor the folder for new files and size changes."""
try:
while True:
# Move unsupported files immediately
with os.scandir(folder_path) as entries:
unsupported_files = itertools.filterfalse(lambda entry: not is_unsupported(entry), entries)
for entry in unsupported_files:
if entry.is_file(): # Ensure it's a file
print(f"Found unsupported file: {entry.name}")
# Move unsupported files immediately
unsupported_file_path = os.path.join(unsupported_folder_path, entry.name)
shutil.move(entry.path, unsupported_file_path)
print(f"Unsupported file moved to 'unsupported' folder: {entry.name}")
# Track new files and check for size changes
current_files = set(os.listdir(folder_path))
new_files = current_files - tracked_files.keys()
print(f"Tracked files: {tracked_files.keys()}")
# Track new files
for file in new_files:
file_path = os.path.join(folder_path, file)
if os.path.isfile(file_path):
try:
size = os.path.getsize(file_path)
tracked_files[file] = (size, time.time())
print(f"New file detected: {file} (Initial size: {size} bytes)")
except OSError:
continue
# Check for changes in tracked files
for file in list(tracked_files.keys()):
file_path = os.path.join(folder_path, file)
if not os.path.exists(file_path): # Skip if file no longer exists
del tracked_files[file]
continue
try:
current_size = os.path.getsize(file_path)
original_size, first_seen = tracked_files[file]
if current_size != original_size:
print(f"File size changed: {file} (Current size: {current_size} bytes)")
# Check if the file size has remained unchanged for 60 seconds
elif time.time() - first_seen >= 60:
# File has stabilized, move it to the "moved" folder
moved_file_path = os.path.join(moved_folder_path, file)
shutil.move(file_path, moved_file_path)
del tracked_files[file]
print(f"File stabilized and moved to 'moved' folder: {file} (Size: {current_size} bytes)")
except OSError:
# File might have been moved or deleted, remove it from tracking
if file in tracked_files:
del tracked_files[file]
# Wait 10 seconds before checking for new files again
time.sleep(10)
except KeyboardInterrupt:
print("\nMonitoring stopped")
except Exception as err:
print(f"An error occurred: {err}")
if __name__ == "__main__":
monitor_folder()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment