Created
August 12, 2021 14:49
-
-
Save TheFern2/0bfd1819b3f1b79c2a4c2f983dec4aca to your computer and use it in GitHub Desktop.
Monitor Folder Sizes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
import os | |
import socket | |
import argparse | |
import time | |
mega_bytes = 25 | |
size_filter = 1024 * 1024 * mega_bytes | |
test_only_local = False | |
root_path = 'C:/' | |
machines = [ | |
'Workstation01', | |
'Workstation02' | |
] | |
ignore_paths = ['Windows'] | |
one_off_paths = ['C:/Windows/Temp'] | |
save_path = 'C:/LogFolder/MonitorFolderSizes/' | |
@dataclass | |
class DirectoryObject: | |
path: str | |
size_in_bytes: int | |
def convert(seconds): | |
seconds = seconds % (24 * 3600) | |
hour = seconds // 3600 | |
seconds %= 3600 | |
minutes = seconds // 60 | |
seconds %= 60 | |
return "%d:%02d:%02d" % (hour, minutes, seconds) | |
def get_directories(path='C:/'): | |
dirlist = [] | |
if path[-1] != '/': | |
path = path + '/' | |
try: | |
dirlist = [ item for item in os.listdir(path) if os.path.isdir(os.path.join(path, item)) ] | |
except OSError: | |
print(f"Unable to retrieve {path}, ensure unc path works on that machine") | |
dirlist_with_root = [] | |
for a_dir in dirlist: | |
dirlist_with_root.append(path + a_dir) | |
return dirlist_with_root | |
def get_directory_size(directory): | |
"""Returns the `directory` size in bytes.""" | |
total = 0 | |
try: | |
# print("[+] Getting the size of", directory) | |
for entry in os.scandir(directory): | |
if entry.is_file(): | |
# if it's a file, use stat() function | |
total += entry.stat().st_size | |
elif entry.is_dir(): | |
# if it's a directory, recursively call this function | |
total += get_directory_size(entry.path) | |
except OSError: | |
pass | |
except NotADirectoryError: | |
# if `directory` isn't a directory, get the file size then | |
return os.path.getsize(directory) | |
except PermissionError: | |
# if for whatever reason we can't open the folder, return 0 | |
return 0 | |
return total | |
def get_size_format(b, factor=1024, suffix="B"): | |
""" | |
Scale bytes to its proper byte format | |
e.g: | |
1253656 => '1.20MB' | |
1253656678 => '1.17GB' | |
""" | |
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: | |
if b < factor: | |
return f"{b:.2f}{unit}{suffix}" | |
b /= factor | |
return f"{b:.2f}Y{suffix}" | |
def main(): | |
global test_only_local, mega_bytes, root_path, size_filter | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--test-local-only', action='store_true') | |
parser.add_argument('--root-path') | |
parser.add_argument('--megabytes', type=int) | |
args = parser.parse_args() | |
if args.test_local_only: | |
test_only_local = True | |
if args.megabytes: | |
mega_bytes = args.megabytes | |
size_filter = 1024 * 1024 * mega_bytes | |
if args.root_path: | |
root_path = args.root_path | |
host_ran = False | |
remote_path = root_path.replace('C:', '') | |
print('Starting...') | |
overall_start = time.time() | |
for machine in machines: | |
print(f'Machine: {machine}') | |
if host_ran and test_only_local: | |
break | |
hostname = socket.gethostname() | |
#root_path = "" | |
if machine == hostname: | |
host_ran = True | |
if args.root_path: | |
root_path = args.root_path | |
else: | |
root_path = "C:/" | |
else: | |
root_path = "//" + machine + "/c$" + remote_path | |
hostname = machine | |
# dir_list = get_directories('C:/TestFolder') | |
dir_list = get_directories(root_path) | |
dir_data = [] | |
# get size for root_path directories but not ignore paths | |
for a_dir in dir_list: | |
base_dir = os.path.basename(a_dir) | |
if base_dir not in ignore_paths: | |
start = time.time() | |
size_bytes = get_directory_size(a_dir) | |
end = time.time() | |
if size_bytes > 0: | |
print(f"{a_dir} {get_size_format(size_bytes)} Time taken: {convert(end-start)}" ) | |
dir_data.append(DirectoryObject(a_dir, size_bytes)) | |
# get size for one off paths were we need to know the size regardless of ignore paths | |
for one_off in one_off_paths: | |
if machine != socket.gethostname(): | |
one_off_path = one_off.replace('C:', '') | |
path_to_check = "//" + machine + "/c$" + one_off_path | |
else: | |
path_to_check = one_off | |
start = time.time() | |
size_bytes = get_directory_size(path_to_check) | |
end = time.time() | |
if size_bytes > 0: | |
print(f"{path_to_check} {get_size_format(size_bytes)} Time taken: {convert(end-start)}" ) | |
dir_data.append(DirectoryObject(path_to_check, size_bytes)) | |
if dir_data: | |
if root_path != 'C:/' and host_ran or root_path[-3:] != 'c$/' and not host_ran: | |
folder_name = os.path.basename(root_path) | |
folder_save_path = save_path + machine + '/' + folder_name | |
initial_sizes_path = folder_save_path + '/' + 'initial_sizes.txt' | |
increased_sizes_path = folder_save_path + '/' + 'increased_sizes.txt' | |
if not os.path.exists(folder_save_path): | |
os.makedirs(folder_save_path) | |
else: | |
folder_save_path = save_path + machine | |
if not os.path.exists(folder_save_path): | |
os.makedirs(folder_save_path) | |
initial_sizes_path = folder_save_path + '/' + 'initial_sizes.txt' | |
increased_sizes_path = folder_save_path + '/' + 'increased_sizes.txt' | |
else: | |
continue | |
if not os.path.isfile(initial_sizes_path) and dir_data: | |
out = [] | |
f = open(initial_sizes_path, "w") | |
for a_dir in dir_data: | |
out.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "\n") | |
f.writelines(out) | |
f.close() | |
else: | |
# update initial_sizes.txt with new folders, leave initial folders with same data | |
## LOAD initial sizes data | |
initial_sizes_data = [] | |
f = open(initial_sizes_path, "r") | |
initial_sizes_lines = f.readlines() | |
f.close() | |
for line in initial_sizes_lines: | |
clean_line = line.strip('\n') | |
path, size_bytes = clean_line.split(',') | |
initial_sizes_data.append(DirectoryObject(path, size_bytes)) | |
## data lists | |
dir_data_paths = [] | |
for a_dir in dir_data: | |
dir_data_paths.append(a_dir.path) | |
initial_sizes_paths = [] | |
for a_dir in initial_sizes_data: | |
initial_sizes_paths.append(a_dir.path) | |
## CHECK what has increased | |
increased_sizes_data = [] | |
for a_dir in initial_sizes_data: | |
for b_dir in dir_data: | |
if b_dir.path in initial_sizes_paths: | |
if b_dir.size_in_bytes > int(a_dir.size_in_bytes) + size_filter and a_dir.path == b_dir.path: | |
size_increase = b_dir.size_in_bytes - int(a_dir.size_in_bytes) | |
print(f'Size increased {a_dir.path} {get_size_format(size_increase)}') | |
increased_sizes_data.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "," + str(b_dir.size_in_bytes) + "," + str(size_increase) + "\n") | |
break | |
elif int(a_dir.size_in_bytes) == b_dir.size_in_bytes and a_dir.path == b_dir.path: | |
break | |
## WRITE to file | |
if increased_sizes_data: | |
f = open(increased_sizes_path, "w") | |
f.writelines(increased_sizes_data) | |
f.close() | |
## CHECK if any dirs have decreased their size if so update initial sizes | |
# else future comparisons won't be correct | |
for a_index in range(len(initial_sizes_data)): | |
for b_index in range(len(dir_data)): | |
if dir_data[b_index].path in initial_sizes_paths: | |
if dir_data[b_index].size_in_bytes < int(initial_sizes_data[a_index].size_in_bytes) and initial_sizes_data[a_index].path == dir_data[b_index].path: | |
size_update = dir_data[b_index].size_in_bytes | |
initial_sizes_data[a_index].size_in_bytes = size_update | |
print(f'Size decreased {initial_sizes_data[a_index].path} {get_size_format(size_update)}') | |
break | |
elif int(initial_sizes_data[a_index].size_in_bytes) == dir_data[b_index].size_in_bytes and initial_sizes_data[a_index].path == dir_data[b_index].path: | |
break | |
## CHECK new folders, if not add to initial sizes list | |
for a_dir in dir_data: | |
if a_dir.path not in initial_sizes_paths: | |
initial_sizes_data.append(DirectoryObject(a_dir.path, a_dir.size_in_bytes)) | |
## CHECK if existing initial folders still exist | |
for index in range(len(initial_sizes_paths)): | |
if initial_sizes_paths[index] not in dir_data_paths: | |
initial_sizes_data.remove(initial_sizes_data[index]) | |
## write file with new data | |
out = [] | |
f = open(initial_sizes_path, "w") | |
for a_dir in initial_sizes_data: | |
out.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "\n") | |
f.writelines(out) | |
f.close() | |
overall_end = time.time() | |
print(f'Done. took {convert(overall_end-overall_start)}') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment