Last active
March 18, 2024 05:16
-
-
Save mtreviso/eec6f6e71776ad55e951e4dc94df0c98 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""quotacheck.py | |
This script displays user quotas for a given mount point. | |
""" | |
import os | |
import platform | |
import subprocess | |
import sys | |
try: | |
from rich.console import Console | |
from rich.table import Table | |
except ImportError: | |
sys.exit("Please install rich as sudo: sudo pip install rich") | |
def get_quota_output(mount_point): | |
log_files = { | |
'/': "/var/log/public_quota_report_root.txt", | |
'/media/hdd1': "/var/log/public_quota_report_hdd1.txt", | |
'/media/hdd2': "/var/log/public_quota_report_hdd2.txt", | |
'/media/scratch': '/var/log/public_quota_report_scratch.txt', | |
} | |
mount_point = mount_point.rstrip('/') if mount_point != '/' else mount_point | |
log_files_aliases = { | |
'/home': '/', | |
'/mnt/data-{}'.format(platform.node()): '/media/hdd1', | |
'/mnt/scratch-{}'.format(platform.node()): '/media/scratch', | |
} | |
if mount_point in log_files_aliases: | |
mount_point = log_files_aliases[mount_point] | |
report_file = log_files.get(mount_point) | |
if report_file is None: | |
print("No log file configured for mount point:", mount_point) | |
sys.exit(1) | |
if os.geteuid() == 0: # Check if running as root | |
try: | |
result = subprocess.run(['repquota', '-s', mount_point], text=True, capture_output=True, check=True) | |
# Already save the report to a file | |
with open(report_file, 'w', encoding='utf8') as f: | |
f.write(result.stdout) | |
return result.stdout | |
except subprocess.CalledProcessError as e: | |
print("Failed to run repquota:", e) | |
sys.exit(1) | |
except Exception as e: | |
try: | |
with open(report_file, 'r', encoding='utf8') as f: | |
return f.read() | |
except Exception as e: | |
print("Failed to read public quota report after running repquota:", e) | |
sys.exit(1) | |
else: | |
try: | |
with open(report_file, 'r', encoding='utf8') as f: | |
return f.read() | |
except Exception as e: | |
print("Failed to read public quota report:", e) | |
sys.exit(1) | |
def convert_human_readable_to_bytes(human_readable): | |
"""Converts a human-readable size string to bytes. | |
Args: | |
human_readable (str): A human-readable size string, e.g. "1.2G" or "10M". | |
Returns: | |
int: The size in bytes. | |
""" | |
suffixes = { | |
"B": 1, | |
"K": 1 << 10, | |
"M": 1 << 20, | |
"G": 1 << 30, | |
"T": 1 << 40, | |
"P": 1 << 50, | |
} | |
# check if it is a valid suffix | |
if human_readable[-1].upper() not in suffixes.keys(): | |
# check if it is a valid size | |
if human_readable[:-1].isdigit(): | |
size = float(human_readable) | |
suffix = "B" | |
else: | |
raise ValueError("Invalid suffix:", human_readable[-1]) | |
else: | |
size = float(human_readable[:-1]) | |
suffix = human_readable[-1].upper() | |
return int(size * suffixes[suffix]) | |
def convert_bytes_to_human_readable(size): | |
"""Converts a size in bytes to a human-readable string. | |
Args: | |
size (int): The size in bytes. | |
Returns: | |
str: A human-readable size string, e.g. "1.2G" or "10M". | |
""" | |
suffixes = ["B", "K", "M", "G", "T", "P"] | |
suffix_index = 0 | |
while size >= 1024 and suffix_index < len(suffixes) - 1: | |
size /= 1024 | |
suffix_index += 1 | |
return "{:.2f}{}".format(size, suffixes[suffix_index]) | |
def parse_quota_output(output): | |
users_above_quota = [] | |
all_users = [] | |
lines = output.splitlines()[5:] # Skip header lines | |
for line in lines: | |
if not line.strip(): | |
continue | |
parts = line.split() | |
user = parts[0] | |
used = convert_human_readable_to_bytes(parts[2]) | |
soft = convert_human_readable_to_bytes(parts[3]) | |
hard = convert_human_readable_to_bytes(parts[4]) | |
grace = parts[5] if len(parts) == 9 else "" | |
if used > soft and soft > 0: | |
users_above_quota.append((user, used, soft, hard, grace)) | |
all_users.append((user, used, soft, hard, grace)) | |
# sort by used space | |
all_users.sort(key=lambda x: x[1], reverse=True) | |
users_above_quota.sort(key=lambda x: x[1], reverse=True) | |
return all_users, users_above_quota | |
def display_table(data, title): | |
table = Table(title=title) | |
table.add_column("User", justify="left", style="cyan") | |
table.add_column("Used Space", justify="right") | |
table.add_column("Soft Limit", justify="right") | |
table.add_column("Hard Limit", justify="right") | |
table.add_column("Grace Period", justify="right", style="magenta") | |
# Get the current user's name to highlight it | |
current_user = os.getenv("USER") | |
for user, used, soft, hard, grace in data: | |
used_str = convert_bytes_to_human_readable(used) | |
soft_str = convert_bytes_to_human_readable(soft) | |
hard_str = convert_bytes_to_human_readable(hard) | |
# Calculate usage percentage | |
if soft > 0: | |
usage_percentage = (used / soft) * 100 | |
else: | |
usage_percentage = 0 | |
# Determine row style based on usage percentage | |
if usage_percentage > 100: | |
style = "bright_red" | |
elif usage_percentage > 50: | |
style = "light_salmon1" | |
else: | |
style = "light_green" | |
# Highlight the current user's name | |
if user == current_user: | |
style += " bold" | |
table.add_row(user, used_str, soft_str, hard_str, grace, style=style, end_section=True) | |
console = Console() | |
console.print(table) | |
def main(mount_point, show_table='all'): | |
output = get_quota_output(mount_point) | |
all_users, users_above_quota = parse_quota_output(output) | |
if show_table == 'all': | |
display_table(users_above_quota, "Shame List (Above Quota) for device: " + mount_point) | |
display_table(all_users, "All User Quotas") | |
elif show_table == 'shame': | |
display_table(users_above_quota, "Shame List (Above Quota) for device: " + mount_point) | |
elif show_table == 'users': | |
display_table(all_users, "All User Quotas for device: " + mount_point) | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
# Print usage message if no mount point is provided | |
print("Usage: python quotacheck.py <mount_point> [shame|users|all]") | |
sys.exit(1) | |
mount_point = sys.argv[1] | |
show_table = sys.argv[2] if len(sys.argv) > 2 else 'all' | |
main(mount_point, show_table) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example:
