Created
April 28, 2024 20:22
-
-
Save JSouthGB/75c9ee2c8a8c45cd406ef2c6bd013432 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import sys | |
from datetime import datetime | |
from typing import List, Dict, Tuple | |
from urllib.parse import urlencode, urlunparse, urlparse | |
import requests | |
# TODO: This script does not yet delete anything, merely logs and notifies until I'm sure it works properly. | |
# TODO: Add checking that all directories to monitor are on the same disk. | |
"""This script monitors the disk usage of specified directories. | |
TL;DR: Deletes files and subdirectories to free space, sends notifications of deleted items. | |
There are three variables located from about lines 188 to 200 in the `main()` function. | |
The intent of this script is to prevent the disk containing the specified paths from becoming full as a lot of | |
unattended actions occur that copy, move, or downloads files to the disk. The necessity of the subdirectories and | |
files located in the specified directories has an expiration, that's why they are removed in chronological order, | |
oldest to newest. | |
When the script runs, it will check if the free disk space is >= the specified threshold (desired minimum free space), | |
if it is, the script does nothing an exits. If the disk free space is <= to the specified threshold, it will execute. | |
Once executed, it will get a list of files and subdirectories from the monitored directories and delete them from | |
oldest to newest based on 'last modified time' until the specified threshold is reached. | |
(desired_free_space = existing_free_space + minimum_number_of_oldest_files). | |
USE WITH EXTREME CAUTION, THIS SCRIPT IS INTENDED TO DELETE FILES | |
""" | |
logging.basicConfig(filename='disk_usage_monitor.log', | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S') | |
class DirectoryHandler: | |
"""Collects paths, last modified time, and size of files and subdirectories (and anything contained therein).""" | |
def __init__(self, dir_path) -> None: | |
self.dir_path: str = dir_path | |
def gather_files_data(self) -> List[Dict[str, Tuple[str, int, float]]]: | |
""" | |
Each file or directory is represented as a dictionary, where the key 'item' maps to a tuple. | |
The tuple consists of the file/directory path, its size in bytes, and its last modification time. | |
Returns: | |
- A list of dictionaries, containing previously noted data. | |
""" | |
files_data: list = [] | |
for entry in os.scandir(self.dir_path): | |
if entry.is_file(): | |
files_data.append({'item': (entry.path, entry.stat().st_size, entry.stat().st_mtime)}) | |
elif entry.is_dir(): | |
# If the entry is a directory, calculate its total size by walking through | |
# the directory tree and summing the sizes of all contained files. | |
total_size: int = 0 | |
for dirpath, _, filenames in os.walk(entry.path): | |
for f in filenames: | |
fp: str = os.path.join(dirpath, f) | |
total_size += os.stat(fp).st_size | |
files_data.append({'item': (entry.path, total_size, entry.stat().st_mtime)}) | |
return files_data | |
class DiskAnalyzer: | |
"""Analyzes disk usage of files and subdirectories until specified threshold is reached.""" | |
def __init__(self, dirs: List[str], threshold: int, label_mapping: Dict[str, str]) -> None: | |
self.dirs: list = dirs | |
self.mount_point: str = os.path.commonpath(dirs) | |
self.free_space: float = self.get_disk_free_space() | |
self.threshold: int = self.gib_to_bytes(threshold) | |
self.label_mapping: dict = label_mapping | |
@staticmethod | |
def gib_to_bytes(gib: int) -> int: | |
""" | |
Convert GiB (gibibytes) to bytes. | |
Formula: Gibibytes = Bytes / 1024^3 | |
""" | |
return gib * (1024 ** 3) | |
@staticmethod | |
def bytes_to_gib(_bytes: int) -> float: | |
""" | |
Convert Bytes to GiB (gibibytes). | |
Formula: Byte = GiB * 1024^3 | |
""" | |
return _bytes / (1024 ** 3) | |
def get_disk_free_space(self) -> int: | |
""" | |
Gets the free disk space of the common mount point of directories. | |
Returns: | |
- The free disk space in bytes. | |
""" | |
stats = os.statvfs(self.mount_point) | |
return stats.f_frsize * stats.f_bavail | |
def format_items(self, item): | |
path = os.path.normpath(item['item'][0]) | |
path_split = path.split(os.sep) | |
label = 'No_label' | |
for dir_name, assigned_label in self.label_mapping.items(): | |
if dir_name in path: | |
label = assigned_label | |
break | |
size_in_gib: float = self.bytes_to_gib(item['item'][1]) | |
mod_time: str = datetime.fromtimestamp(item['item'][2]).strftime('%Y-%m-%d %H:%M:%S') | |
message = f'{label}: {path_split[-1].strip()}, Size: {size_in_gib:0.2f} GiB, Modified: {mod_time}' | |
logging.info(message) | |
return message | |
def analyze(self) -> List[str]: | |
""" | |
Examines the directories and logs the path, size and last modification time of each file or subdirectory | |
deleted. The examination stops when the total size of the deleted files and directories together with | |
the existing free disk space reach the threshold set. | |
""" | |
if self.free_space >= self.threshold: | |
logging.info('Free disk space is sufficient.') | |
sys.exit() | |
all_items: List[Dict[str, Tuple[str, int, float]]] = [] | |
for dir_path in self.dirs: | |
handler = DirectoryHandler(dir_path) | |
all_items.extend(handler.gather_files_data()) | |
sorted_items: List[Dict[str, Tuple[str, int, float]]] = sorted(all_items, key=lambda x: x['item'][2]) | |
total_size: int = 0 | |
processed_items = [] | |
for item in sorted_items: | |
total_size += item['item'][1] | |
message = self.format_items(item) | |
processed_items.append(message) | |
if total_size >= (self.threshold - self.free_space): | |
break | |
return processed_items | |
def send_notification(data: List[str], url: str, token: str): | |
""" | |
Gotify notification | |
Arguments: | |
- data (list): List of strings to send. | |
- url (str): Gotify server URL. | |
- token (str): Gotify application token. | |
""" | |
api_endpoint = 'message' | |
params = {'token': token} | |
url = urlunparse( | |
urlparse(url)._replace( | |
path=api_endpoint, | |
query=urlencode(params) | |
)) | |
message = '\n\n'.join(data) # for readability | |
params = { | |
'title': 'Disk Usage Monitor Alert', | |
'message': message, | |
'priority': 5, | |
} | |
response = requests.post(url, data=params) | |
if response.status_code != 200: | |
logging.error(f'An error occurred when sending notification: {response.content}') | |
def main(): | |
""" | |
Define the directories to be examined and the threshold for the sum of the sizes of | |
processed files/dirs together with the remaining free disk space. | |
`threshold_limit` is expected to be specified in GiB | |
""" | |
dirs: list = [ | |
'/path/to/disk1/monitored_directory1', | |
'/path/to/disk1/monitored_directory2', | |
] | |
# `threshold_limit` should be a whole integer greater than 0 | |
threshold_limit: int = 500 # In GiB | |
# the `message` api endpoint is hardcoded, do not add it. | |
gotify_url: str = 'https://gotify.example.com/' | |
# your app token (https://gotify.net/docs/pushmsg) | |
gotify_token: str = 'YOUR_APP_TOKEN' | |
labels = [ | |
element.strip('/') # strip trailing slashes | |
.split('/')[-1] # split at '/' | |
.capitalize() # get last element | |
for element in dirs] # capitalize last element | |
dir_labels = dict(zip(dirs, labels)) | |
# error checking | |
if not all(os.path.isdir(dir_path) for dir_path in dirs): | |
logging.error(f'One or more of your directories do not exist:\n{'\n'.join(dirs)}') | |
raise Exception(f'One or more of your directories do not exist:\n{'\n'.join(dirs)}') | |
if not isinstance(threshold_limit, int): | |
raise ValueError('Integers only, not floats. (no decimals)') | |
if threshold_limit < 1: | |
raise ValueError('Invalid input: `threshold_limit` must be greater than 0.') | |
# ready to go | |
analyzer = DiskAnalyzer(dirs, threshold_limit, dir_labels) | |
try: | |
processed_items = analyzer.analyze() | |
send_notification(processed_items, gotify_url, gotify_token) | |
except Exception as err: | |
logging.exception(f'An error occurred: {str(err)}') | |
raise | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment