Skip to content

Instantly share code, notes, and snippets.

@TheFern2
Created August 12, 2021 14:49
Show Gist options
  • Save TheFern2/0bfd1819b3f1b79c2a4c2f983dec4aca to your computer and use it in GitHub Desktop.
Save TheFern2/0bfd1819b3f1b79c2a4c2f983dec4aca to your computer and use it in GitHub Desktop.
Monitor Folder Sizes
from dataclasses import dataclass
import os
import socket
import argparse
import time
mega_bytes = 25
size_filter = 1024 * 1024 * mega_bytes
test_only_local = False
root_path = 'C:/'
machines = [
'Workstation01',
'Workstation02'
]
ignore_paths = ['Windows']
one_off_paths = ['C:/Windows/Temp']
save_path = 'C:/LogFolder/MonitorFolderSizes/'
@dataclass
class DirectoryObject:
path: str
size_in_bytes: int
def convert(seconds):
seconds = seconds % (24 * 3600)
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%d:%02d:%02d" % (hour, minutes, seconds)
def get_directories(path='C:/'):
dirlist = []
if path[-1] != '/':
path = path + '/'
try:
dirlist = [ item for item in os.listdir(path) if os.path.isdir(os.path.join(path, item)) ]
except OSError:
print(f"Unable to retrieve {path}, ensure unc path works on that machine")
dirlist_with_root = []
for a_dir in dirlist:
dirlist_with_root.append(path + a_dir)
return dirlist_with_root
def get_directory_size(directory):
"""Returns the `directory` size in bytes."""
total = 0
try:
# print("[+] Getting the size of", directory)
for entry in os.scandir(directory):
if entry.is_file():
# if it's a file, use stat() function
total += entry.stat().st_size
elif entry.is_dir():
# if it's a directory, recursively call this function
total += get_directory_size(entry.path)
except OSError:
pass
except NotADirectoryError:
# if `directory` isn't a directory, get the file size then
return os.path.getsize(directory)
except PermissionError:
# if for whatever reason we can't open the folder, return 0
return 0
return total
def get_size_format(b, factor=1024, suffix="B"):
"""
Scale bytes to its proper byte format
e.g:
1253656 => '1.20MB'
1253656678 => '1.17GB'
"""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if b < factor:
return f"{b:.2f}{unit}{suffix}"
b /= factor
return f"{b:.2f}Y{suffix}"
def main():
global test_only_local, mega_bytes, root_path, size_filter
parser = argparse.ArgumentParser()
parser.add_argument('--test-local-only', action='store_true')
parser.add_argument('--root-path')
parser.add_argument('--megabytes', type=int)
args = parser.parse_args()
if args.test_local_only:
test_only_local = True
if args.megabytes:
mega_bytes = args.megabytes
size_filter = 1024 * 1024 * mega_bytes
if args.root_path:
root_path = args.root_path
host_ran = False
remote_path = root_path.replace('C:', '')
print('Starting...')
overall_start = time.time()
for machine in machines:
print(f'Machine: {machine}')
if host_ran and test_only_local:
break
hostname = socket.gethostname()
#root_path = ""
if machine == hostname:
host_ran = True
if args.root_path:
root_path = args.root_path
else:
root_path = "C:/"
else:
root_path = "//" + machine + "/c$" + remote_path
hostname = machine
# dir_list = get_directories('C:/TestFolder')
dir_list = get_directories(root_path)
dir_data = []
# get size for root_path directories but not ignore paths
for a_dir in dir_list:
base_dir = os.path.basename(a_dir)
if base_dir not in ignore_paths:
start = time.time()
size_bytes = get_directory_size(a_dir)
end = time.time()
if size_bytes > 0:
print(f"{a_dir} {get_size_format(size_bytes)} Time taken: {convert(end-start)}" )
dir_data.append(DirectoryObject(a_dir, size_bytes))
# get size for one off paths were we need to know the size regardless of ignore paths
for one_off in one_off_paths:
if machine != socket.gethostname():
one_off_path = one_off.replace('C:', '')
path_to_check = "//" + machine + "/c$" + one_off_path
else:
path_to_check = one_off
start = time.time()
size_bytes = get_directory_size(path_to_check)
end = time.time()
if size_bytes > 0:
print(f"{path_to_check} {get_size_format(size_bytes)} Time taken: {convert(end-start)}" )
dir_data.append(DirectoryObject(path_to_check, size_bytes))
if dir_data:
if root_path != 'C:/' and host_ran or root_path[-3:] != 'c$/' and not host_ran:
folder_name = os.path.basename(root_path)
folder_save_path = save_path + machine + '/' + folder_name
initial_sizes_path = folder_save_path + '/' + 'initial_sizes.txt'
increased_sizes_path = folder_save_path + '/' + 'increased_sizes.txt'
if not os.path.exists(folder_save_path):
os.makedirs(folder_save_path)
else:
folder_save_path = save_path + machine
if not os.path.exists(folder_save_path):
os.makedirs(folder_save_path)
initial_sizes_path = folder_save_path + '/' + 'initial_sizes.txt'
increased_sizes_path = folder_save_path + '/' + 'increased_sizes.txt'
else:
continue
if not os.path.isfile(initial_sizes_path) and dir_data:
out = []
f = open(initial_sizes_path, "w")
for a_dir in dir_data:
out.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "\n")
f.writelines(out)
f.close()
else:
# update initial_sizes.txt with new folders, leave initial folders with same data
## LOAD initial sizes data
initial_sizes_data = []
f = open(initial_sizes_path, "r")
initial_sizes_lines = f.readlines()
f.close()
for line in initial_sizes_lines:
clean_line = line.strip('\n')
path, size_bytes = clean_line.split(',')
initial_sizes_data.append(DirectoryObject(path, size_bytes))
## data lists
dir_data_paths = []
for a_dir in dir_data:
dir_data_paths.append(a_dir.path)
initial_sizes_paths = []
for a_dir in initial_sizes_data:
initial_sizes_paths.append(a_dir.path)
## CHECK what has increased
increased_sizes_data = []
for a_dir in initial_sizes_data:
for b_dir in dir_data:
if b_dir.path in initial_sizes_paths:
if b_dir.size_in_bytes > int(a_dir.size_in_bytes) + size_filter and a_dir.path == b_dir.path:
size_increase = b_dir.size_in_bytes - int(a_dir.size_in_bytes)
print(f'Size increased {a_dir.path} {get_size_format(size_increase)}')
increased_sizes_data.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "," + str(b_dir.size_in_bytes) + "," + str(size_increase) + "\n")
break
elif int(a_dir.size_in_bytes) == b_dir.size_in_bytes and a_dir.path == b_dir.path:
break
## WRITE to file
if increased_sizes_data:
f = open(increased_sizes_path, "w")
f.writelines(increased_sizes_data)
f.close()
## CHECK if any dirs have decreased their size if so update initial sizes
# else future comparisons won't be correct
for a_index in range(len(initial_sizes_data)):
for b_index in range(len(dir_data)):
if dir_data[b_index].path in initial_sizes_paths:
if dir_data[b_index].size_in_bytes < int(initial_sizes_data[a_index].size_in_bytes) and initial_sizes_data[a_index].path == dir_data[b_index].path:
size_update = dir_data[b_index].size_in_bytes
initial_sizes_data[a_index].size_in_bytes = size_update
print(f'Size decreased {initial_sizes_data[a_index].path} {get_size_format(size_update)}')
break
elif int(initial_sizes_data[a_index].size_in_bytes) == dir_data[b_index].size_in_bytes and initial_sizes_data[a_index].path == dir_data[b_index].path:
break
## CHECK new folders, if not add to initial sizes list
for a_dir in dir_data:
if a_dir.path not in initial_sizes_paths:
initial_sizes_data.append(DirectoryObject(a_dir.path, a_dir.size_in_bytes))
## CHECK if existing initial folders still exist
for index in range(len(initial_sizes_paths)):
if initial_sizes_paths[index] not in dir_data_paths:
initial_sizes_data.remove(initial_sizes_data[index])
## write file with new data
out = []
f = open(initial_sizes_path, "w")
for a_dir in initial_sizes_data:
out.append(a_dir.path + "," + str(a_dir.size_in_bytes) + "\n")
f.writelines(out)
f.close()
overall_end = time.time()
print(f'Done. took {convert(overall_end-overall_start)}')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment