Created
March 14, 2025 12:19
-
-
Save spinningcat/3a0cb811aa82b9fc39584cbb43684a01 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import shutil | |
import itertools | |
# Define folder paths | |
folder_path = "/home/spinningca-t/workers/listener/new/" | |
moved_folder_path = "/home/spinningca-t/workers/listener/done/" | |
unsupported_folder_path = "/home/spinningca-t/workers/listener/unsupported/" | |
# Create the "new", "moved" and "unsupported" folders if they don't exist | |
if not os.path.exists(folder_path): | |
os.makedirs(folder_path) | |
if not os.path.exists(moved_folder_path): | |
os.makedirs(moved_folder_path) | |
if not os.path.exists(unsupported_folder_path): | |
os.makedirs(unsupported_folder_path) | |
# Supported file extensions | |
supported_extensions = {'.csx', '.docx', '.jpeg', '.jpg', '.webp', '.odt', '.xlsx', '.doc', '.html', '.md', '.pptx', '.txt', '.sub', '.msg', '.svg', '.iso'} | |
# Dictionary to track files and their sizes | |
tracked_files = {} | |
print(f"Monitoring {folder_path} for new files and size changes...") | |
def is_unsupported(entry): | |
"""Check if a file is unsupported based on its extension.""" | |
_, extension = os.path.splitext(entry.name) | |
return extension.lower() not in supported_extensions | |
def monitor_folder(): | |
"""Monitor the folder for new files and size changes.""" | |
try: | |
while True: | |
# Move unsupported files immediately | |
with os.scandir(folder_path) as entries: | |
unsupported_files = itertools.filterfalse(lambda entry: not is_unsupported(entry), entries) | |
for entry in unsupported_files: | |
if entry.is_file(): # Ensure it's a file | |
print(f"Found unsupported file: {entry.name}") | |
# Move unsupported files immediately | |
unsupported_file_path = os.path.join(unsupported_folder_path, entry.name) | |
shutil.move(entry.path, unsupported_file_path) | |
print(f"Unsupported file moved to 'unsupported' folder: {entry.name}") | |
# Track new files and check for size changes | |
current_files = set(os.listdir(folder_path)) | |
new_files = current_files - tracked_files.keys() | |
print(f"Tracked files: {tracked_files.keys()}") | |
# Track new files | |
for file in new_files: | |
file_path = os.path.join(folder_path, file) | |
if os.path.isfile(file_path): | |
try: | |
size = os.path.getsize(file_path) | |
tracked_files[file] = (size, time.time()) | |
print(f"New file detected: {file} (Initial size: {size} bytes)") | |
except OSError: | |
continue | |
# Check for changes in tracked files | |
for file in list(tracked_files.keys()): | |
file_path = os.path.join(folder_path, file) | |
if not os.path.exists(file_path): # Skip if file no longer exists | |
del tracked_files[file] | |
continue | |
try: | |
current_size = os.path.getsize(file_path) | |
original_size, first_seen = tracked_files[file] | |
if current_size != original_size: | |
print(f"File size changed: {file} (Current size: {current_size} bytes)") | |
# Check if the file size has remained unchanged for 60 seconds | |
elif time.time() - first_seen >= 60: | |
# File has stabilized, move it to the "moved" folder | |
moved_file_path = os.path.join(moved_folder_path, file) | |
shutil.move(file_path, moved_file_path) | |
del tracked_files[file] | |
print(f"File stabilized and moved to 'moved' folder: {file} (Size: {current_size} bytes)") | |
except OSError: | |
# File might have been moved or deleted, remove it from tracking | |
if file in tracked_files: | |
del tracked_files[file] | |
# Wait 10 seconds before checking for new files again | |
time.sleep(10) | |
except KeyboardInterrupt: | |
print("\nMonitoring stopped") | |
except Exception as err: | |
print(f"An error occurred: {err}") | |
if __name__ == "__main__": | |
monitor_folder() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment