Created
December 1, 2023 18:00
-
-
Save CodeZombie/d1fb7212266c62ebca3ad1149d6b89d1 to your computer and use it in GitHub Desktop.
Given a huge directory a files, this script will search the content of each file, and copy it to a new directory if it contains a substring. Threaded.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import threading | |
import shutil | |
from collections import namedtuple | |
ThreadWithData = namedtuple('thread', 'thread, filecount') | |
# Constants | |
MAX_THREADS = 12 | |
FILES_PER_THREAD = 500 | |
DIRECTORY = "./files_to_search" | |
OUTPUT_DIRECTORY = "./output" | |
SEARCH_STRING = "needle" | |
def get_files_in_directory(directory): | |
files = [] | |
for filename in os.listdir(directory): | |
if os.path.isfile(os.path.join(directory, filename)): | |
files.append(filename) | |
return files | |
def move_files_if_they_contain_string(files): | |
for file in files: | |
with open(os.path.join(DIRECTORY, file), encoding="utf-8", errors="ignore") as f: | |
if SEARCH_STRING.lower() in f.read().lower(): | |
print("Found \"" + SEARCH_STRING + "\" in " + file) | |
shutil.copyfile(os.path.join(DIRECTORY, file), os.path.join(OUTPUT_DIRECTORY, file)) | |
def main(): | |
print("Starting...") | |
files = get_files_in_directory(DIRECTORY) | |
print("found " + str(len(files)) + " files.") | |
threads = [] | |
analyzed_files = 0 | |
while len(files) > 0 or len(threads) > 0: | |
finished_threads = [thread for thread in threads if not thread.thread.is_alive()] | |
finished_thread_count = sum([thread.filecount for thread in finished_threads]) | |
analyzed_files += finished_thread_count | |
if finished_thread_count > 0: | |
print("Analyzed " + str(analyzed_files) + " files.") | |
threads = [thread for thread in threads if thread not in finished_threads] | |
if len(threads) < MAX_THREADS and len(files) > 0: | |
# Grab a chunk of files to be checked in a new thread | |
files_to_check = files[:FILES_PER_THREAD] if len(files) > FILES_PER_THREAD else files | |
# Remove the files we just checked from the master list | |
files = files[len(files_to_check):] | |
t = threading.Thread(target=move_files_if_they_contain_string, args=(files_to_check,)) | |
t.start() | |
threads.append(ThreadWithData(thread=t, filecount=len(files_to_check))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment