Skip to content

Instantly share code, notes, and snippets.

@JSouthGB
Created April 28, 2024 20:22
Show Gist options
  • Save JSouthGB/75c9ee2c8a8c45cd406ef2c6bd013432 to your computer and use it in GitHub Desktop.
Save JSouthGB/75c9ee2c8a8c45cd406ef2c6bd013432 to your computer and use it in GitHub Desktop.
import logging
import os
import sys
from datetime import datetime
from typing import List, Dict, Tuple
from urllib.parse import urlencode, urlunparse, urlparse
import requests
# TODO: This script does not yet delete anything, merely logs and notifies until I'm sure it works properly.
# TODO: Add checking that all directories to monitor are on the same disk.
"""This script monitors the disk usage of specified directories.
TL;DR: Deletes files and subdirectories to free space, sends notifications of deleted items.
There are three variables located from about lines 188 to 200 in the `main()` function.
The intent of this script is to prevent the disk containing the specified paths from becoming full as a lot of
unattended actions occur that copy, move, or downloads files to the disk. The necessity of the subdirectories and
files located in the specified directories has an expiration, that's why they are removed in chronological order,
oldest to newest.
When the script runs, it will check if the free disk space is >= the specified threshold (desired minimum free space),
if it is, the script does nothing an exits. If the disk free space is <= to the specified threshold, it will execute.
Once executed, it will get a list of files and subdirectories from the monitored directories and delete them from
oldest to newest based on 'last modified time' until the specified threshold is reached.
(desired_free_space = existing_free_space + minimum_number_of_oldest_files).
USE WITH EXTREME CAUTION, THIS SCRIPT IS INTENDED TO DELETE FILES
"""
logging.basicConfig(filename='disk_usage_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
class DirectoryHandler:
"""Collects paths, last modified time, and size of files and subdirectories (and anything contained therein)."""
def __init__(self, dir_path) -> None:
self.dir_path: str = dir_path
def gather_files_data(self) -> List[Dict[str, Tuple[str, int, float]]]:
"""
Each file or directory is represented as a dictionary, where the key 'item' maps to a tuple.
The tuple consists of the file/directory path, its size in bytes, and its last modification time.
Returns:
- A list of dictionaries, containing previously noted data.
"""
files_data: list = []
for entry in os.scandir(self.dir_path):
if entry.is_file():
files_data.append({'item': (entry.path, entry.stat().st_size, entry.stat().st_mtime)})
elif entry.is_dir():
# If the entry is a directory, calculate its total size by walking through
# the directory tree and summing the sizes of all contained files.
total_size: int = 0
for dirpath, _, filenames in os.walk(entry.path):
for f in filenames:
fp: str = os.path.join(dirpath, f)
total_size += os.stat(fp).st_size
files_data.append({'item': (entry.path, total_size, entry.stat().st_mtime)})
return files_data
class DiskAnalyzer:
"""Analyzes disk usage of files and subdirectories until specified threshold is reached."""
def __init__(self, dirs: List[str], threshold: int, label_mapping: Dict[str, str]) -> None:
self.dirs: list = dirs
self.mount_point: str = os.path.commonpath(dirs)
self.free_space: float = self.get_disk_free_space()
self.threshold: int = self.gib_to_bytes(threshold)
self.label_mapping: dict = label_mapping
@staticmethod
def gib_to_bytes(gib: int) -> int:
"""
Convert GiB (gibibytes) to bytes.
Formula: Gibibytes = Bytes / 1024^3
"""
return gib * (1024 ** 3)
@staticmethod
def bytes_to_gib(_bytes: int) -> float:
"""
Convert Bytes to GiB (gibibytes).
Formula: Byte = GiB * 1024^3
"""
return _bytes / (1024 ** 3)
def get_disk_free_space(self) -> int:
"""
Gets the free disk space of the common mount point of directories.
Returns:
- The free disk space in bytes.
"""
stats = os.statvfs(self.mount_point)
return stats.f_frsize * stats.f_bavail
def format_items(self, item):
path = os.path.normpath(item['item'][0])
path_split = path.split(os.sep)
label = 'No_label'
for dir_name, assigned_label in self.label_mapping.items():
if dir_name in path:
label = assigned_label
break
size_in_gib: float = self.bytes_to_gib(item['item'][1])
mod_time: str = datetime.fromtimestamp(item['item'][2]).strftime('%Y-%m-%d %H:%M:%S')
message = f'{label}: {path_split[-1].strip()}, Size: {size_in_gib:0.2f} GiB, Modified: {mod_time}'
logging.info(message)
return message
def analyze(self) -> List[str]:
"""
Examines the directories and logs the path, size and last modification time of each file or subdirectory
deleted. The examination stops when the total size of the deleted files and directories together with
the existing free disk space reach the threshold set.
"""
if self.free_space >= self.threshold:
logging.info('Free disk space is sufficient.')
sys.exit()
all_items: List[Dict[str, Tuple[str, int, float]]] = []
for dir_path in self.dirs:
handler = DirectoryHandler(dir_path)
all_items.extend(handler.gather_files_data())
sorted_items: List[Dict[str, Tuple[str, int, float]]] = sorted(all_items, key=lambda x: x['item'][2])
total_size: int = 0
processed_items = []
for item in sorted_items:
total_size += item['item'][1]
message = self.format_items(item)
processed_items.append(message)
if total_size >= (self.threshold - self.free_space):
break
return processed_items
def send_notification(data: List[str], url: str, token: str):
"""
Gotify notification
Arguments:
- data (list): List of strings to send.
- url (str): Gotify server URL.
- token (str): Gotify application token.
"""
api_endpoint = 'message'
params = {'token': token}
url = urlunparse(
urlparse(url)._replace(
path=api_endpoint,
query=urlencode(params)
))
message = '\n\n'.join(data) # for readability
params = {
'title': 'Disk Usage Monitor Alert',
'message': message,
'priority': 5,
}
response = requests.post(url, data=params)
if response.status_code != 200:
logging.error(f'An error occurred when sending notification: {response.content}')
def main():
"""
Define the directories to be examined and the threshold for the sum of the sizes of
processed files/dirs together with the remaining free disk space.
`threshold_limit` is expected to be specified in GiB
"""
dirs: list = [
'/path/to/disk1/monitored_directory1',
'/path/to/disk1/monitored_directory2',
]
# `threshold_limit` should be a whole integer greater than 0
threshold_limit: int = 500 # In GiB
# the `message` api endpoint is hardcoded, do not add it.
gotify_url: str = 'https://gotify.example.com/'
# your app token (https://gotify.net/docs/pushmsg)
gotify_token: str = 'YOUR_APP_TOKEN'
labels = [
element.strip('/') # strip trailing slashes
.split('/')[-1] # split at '/'
.capitalize() # get last element
for element in dirs] # capitalize last element
dir_labels = dict(zip(dirs, labels))
# error checking
if not all(os.path.isdir(dir_path) for dir_path in dirs):
logging.error(f'One or more of your directories do not exist:\n{'\n'.join(dirs)}')
raise Exception(f'One or more of your directories do not exist:\n{'\n'.join(dirs)}')
if not isinstance(threshold_limit, int):
raise ValueError('Integers only, not floats. (no decimals)')
if threshold_limit < 1:
raise ValueError('Invalid input: `threshold_limit` must be greater than 0.')
# ready to go
analyzer = DiskAnalyzer(dirs, threshold_limit, dir_labels)
try:
processed_items = analyzer.analyze()
send_notification(processed_items, gotify_url, gotify_token)
except Exception as err:
logging.exception(f'An error occurred: {str(err)}')
raise
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment