Last active
August 3, 2025 00:52
-
-
Save arrogantrabbit/54fb08c4eda15624448e8c6d9990b522 to your computer and use it in GitHub Desktop.
Storj upload/download monitor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Created by Google Gemini 2.5 Pro | |
import time | |
import json | |
import argparse | |
import curses | |
from collections import deque | |
import os | |
import math | |
import sys | |
import traceback | |
# --- Helper Functions (Unchanged) --- | |
def format_size(size_bytes): | |
if size_bytes < 1024: return f"{int(size_bytes)}B" | |
size_name = ("B", "K", "M", "G", "T", "P", "E", "Z", "Y") | |
i = min(len(size_name) - 1, int(math.floor(math.log(size_bytes, 1024)))) if size_bytes > 0 else 0 | |
p = math.pow(1024, i) | |
s = round(size_bytes / p, 1) | |
if s == int(s): s = int(s) | |
return f"{s}{size_name[i]}" | |
def parse_log_line(line): | |
try: | |
parts = line.split('\t') | |
if len(parts) >= 5: | |
json_data = json.loads(parts[4]) | |
return json_data.get("Action"), json_data.get("Size") | |
except (IndexError, json.JSONDecodeError): | |
pass | |
return None, None | |
# --- Core Data Structure (Unchanged) --- | |
class Histogram: | |
def __init__(self, maxlen=None): | |
self._raw_data = deque(maxlen=maxlen) | |
self._binned_data = [] | |
self._plot_width = 0 | |
self._max_val = 0 | |
self.log_base = 10 | |
def _get_bin_index(self, value, use_log_scale): | |
if self._plot_width < 1: return -1 | |
if use_log_scale and value > 0: | |
max_log = math.log(self._max_val, self.log_base) if self._max_val > 0 else 1 | |
log_val = math.log(value, self.log_base) | |
return int((log_val / max_log) * (self._plot_width - 1)) if max_log > 0 else 0 | |
else: | |
bin_width = self._max_val / self._plot_width if self._plot_width > 0 else 1 | |
if bin_width > 0: return int(value / bin_width) | |
return -1 | |
def add(self, value, use_log_scale): | |
if self._raw_data.maxlen is not None and len(self._raw_data) >= self._raw_data.maxlen: | |
old_value = self._raw_data.popleft() | |
old_bin_idx = self._get_bin_index(old_value, use_log_scale) | |
if 0 <= old_bin_idx < len(self._binned_data): | |
self._binned_data[old_bin_idx] = max(0, self._binned_data[old_bin_idx] - 1) | |
self._raw_data.append(value) | |
if value > self._max_val: | |
self.rebin_from_raw_data(self._plot_width, use_log_scale) | |
else: | |
new_bin_idx = self._get_bin_index(value, use_log_scale) | |
if 0 <= new_bin_idx < len(self._binned_data): | |
self._binned_data[new_bin_idx] += 1 | |
def rebin_from_raw_data(self, plot_width, use_log_scale): | |
self._plot_width = plot_width | |
self._max_val = max(self._raw_data) if self._raw_data else 0 | |
self._binned_data = [0] * plot_width | |
for value in self._raw_data: | |
idx = self._get_bin_index(value, use_log_scale) | |
if 0 <= idx < plot_width: self._binned_data[idx] += 1 | |
def get_binned_data(self): return self._binned_data | |
def get_max_val(self): return self._max_val | |
def get_max_bin_count(self): return max(self._binned_data) if self._binned_data else 0 | |
def __len__(self): return len(self._raw_data) | |
# --- Drawing Function (with the definitive x-label fix) --- | |
def draw_histogram(win, y_start, x_start, title, binned_data, max_val, max_bin_count, **kwargs): | |
plot_width = kwargs['plot_width']; plot_height = kwargs['plot_height'] | |
y_plot_start = y_start + 1; x_plot_start = x_start + kwargs['y_axis_width'] | |
win.addstr(y_start, x_start, title, curses.A_BOLD) | |
for i in range(plot_height): win.addstr(y_plot_start + i, x_plot_start - 1, '│') | |
win.addstr(y_plot_start + plot_height, x_start + kwargs['y_axis_width'] - 1, '└' + '─' * plot_width) | |
log_base = kwargs['log_base'] | |
max_log_y = math.log(max_bin_count, log_base) if max_bin_count > 0 else 1 | |
num_y_labels = kwargs['num_y_labels'] | |
if num_y_labels is None: num_y_labels = plot_height | |
if num_y_labels > 0: | |
for i in range(num_y_labels): | |
ratio = i / (num_y_labels - 1) if num_y_labels > 1 else 1.0 | |
y_pos = y_plot_start + plot_height - 1 - int(ratio * (plot_height - 1)) | |
val = int(log_base ** (ratio * max_log_y)) if kwargs['use_log_y_scale'] and max_bin_count > 1 else int(ratio * max_bin_count) | |
win.addstr(y_pos, x_start, f"{val:<{kwargs['y_axis_width']-2}} │") | |
for i, count in enumerate(binned_data): | |
bar_height = 0 | |
if count > 0: | |
if kwargs['use_log_y_scale']: bar_height = int((math.log(count, log_base) / max_log_y) * (plot_height - 1)) if max_log_y > 0 else 0 | |
else: bar_height = int((count / max_bin_count) * (plot_height - 1)) | |
for j in range(max(0, bar_height) + 1): win.addstr(y_plot_start + plot_height - 1 - j, x_plot_start + i, '█', kwargs['color_pair']) | |
num_x_labels = kwargs['num_x_labels'] | |
if num_x_labels is None: num_x_labels = max(2, plot_width // 12) | |
if num_x_labels > 0: | |
max_log_x = math.log(max_val, log_base) if max_val > 0 else 1 | |
for i in range(num_x_labels + 1): | |
ratio = i / num_x_labels | |
x_pos = int(ratio * (plot_width - 1)) | |
val = log_base ** (ratio * max_log_x) if kwargs['use_log_x_scale'] else ratio * max_val | |
text = format_size(val) | |
final_x = min(max(x_plot_start, x_plot_start + x_pos - len(text) // 2), x_plot_start + plot_width - len(text)) | |
# By removing the collision check, we now fully honor the user's requested number of labels. | |
win.addstr(y_plot_start + plot_height + 1, final_x, text) | |
# --- Main Application Loop (Unchanged) --- | |
def main(stdscr, args, uploads, downloads): | |
"""Main application loop with a correct double-buffering architecture.""" | |
stdscr.nodelay(1); curses.curs_set(0) | |
curses.start_color(); curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK); curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) | |
h, w = stdscr.getmaxyx() | |
pad = curses.newpad(h, w) | |
plot_width = (w - 4) - 9 if w > 14 else 0 | |
uploads.rebin_from_raw_data(plot_width, args.log_x_scale) | |
downloads.rebin_from_raw_data(plot_width, args.log_x_scale) | |
try: | |
with open(args.logfile, 'r') as logfile: | |
logfile.seek(0, 2) | |
while True: | |
user_input = stdscr.getch() | |
if user_input == ord('q'): break | |
if user_input == curses.KEY_RESIZE: | |
try: | |
h, w = stdscr.getmaxyx() | |
curses.resize_term(h,w) | |
stdscr.clear(); stdscr.refresh() # Clear the main screen once on resize | |
pad = curses.newpad(h, w) | |
plot_width = (w - 4) - 9 if w > 14 else 0 | |
uploads.rebin_from_raw_data(plot_width, args.log_x_scale) | |
downloads.rebin_from_raw_data(plot_width, args.log_x_scale) | |
except curses.error: | |
pass # Ignore errors from resizing too small | |
while True: | |
line = logfile.readline() | |
if not line: break | |
action, size = parse_log_line(line) | |
if action and size is not None: | |
(uploads if 'PUT' in action else downloads).add(size, args.log_x_scale) | |
pad.erase() | |
h, w = pad.getmaxyx() | |
if h < 20 or w < 50: | |
pad.addstr(0, 0, "Terminal too small.") | |
else: | |
plot_width = (w - 4) - 9 | |
hist_height = h // 2 - 4 | |
x_scale = "log" if args.log_x_scale else "linear"; y_scale = "log" if args.log_y_scale else "linear" | |
kwargs = {"plot_width": plot_width, "plot_height": hist_height, "y_axis_width": 9, "log_base": 10, | |
"num_y_labels": args.y_labels, "num_x_labels": args.x_labels, | |
"use_log_x_scale": args.log_x_scale, "use_log_y_scale": args.log_y_scale} | |
draw_histogram(pad, 2, 2, f"Uploads (Y: {y_scale}, X: {x_scale})", uploads.get_binned_data(), uploads.get_max_val(), uploads.get_max_bin_count(), color_pair=curses.color_pair(1), **kwargs) | |
draw_histogram(pad, h // 2, 2, f"Downloads (Y: {y_scale}, X: {x_scale})", downloads.get_binned_data(), downloads.get_max_val(), downloads.get_max_bin_count(), color_pair=curses.color_pair(2), **kwargs) | |
pad.addstr(h - 1, 2, f"Analyzing last {args.lines if args.lines else 'all'} entries. Uploads: {len(uploads)}, Downloads: {len(downloads)}") | |
pad.addstr(0, 2, f"Watching {os.path.basename(args.logfile)} | Press 'q' to quit", curses.A_BOLD) | |
pad.refresh(0, 0, 0, 0, h - 1, w - 1) | |
time.sleep(0.05) | |
except Exception: | |
curses.endwin() | |
print("An unexpected error occurred. Please see the details below:") | |
traceback.print_exc() | |
# --- Main Entrypoint (Unchanged) --- | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Watch a Storj log file and display real-time histograms of file sizes.", formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument("logfile", help="The path to the log file (e.g., /var/log/storj.log).") | |
parser.add_argument("--lines", type=int, default=None, help="Number of recent entries to analyze.\nIf not provided, all new entries will be considered.") | |
parser.add_argument("--ingest-all", action="store_true", help="Ingest the entire log file on startup before watching for new lines.") | |
parser.add_argument("--x-labels", type=int, default=None, help="Desired number of labels on the X-axis (file size).\nDefault: auto-calculated based on terminal width.") | |
parser.add_argument("--y-labels", type=int, default=None, help="Desired number of labels on the Y-axis (piece count).\nDefault: max possible for plot height.") | |
parser.add_argument("--log-x-scale", action="store_true", help="Use a logarithmic scale for the X-axis (file size).") | |
parser.add_argument("--log-y-scale", action="store_true", help="Use a logarithmic scale for the Y-axis (piece count).") | |
args = parser.parse_args() | |
uploads_hist = Histogram(maxlen=args.lines) | |
downloads_hist = Histogram(maxlen=args.lines) | |
if args.ingest_all: | |
try: | |
total_size = os.path.getsize(args.logfile) | |
with open(args.logfile, 'r') as logfile: | |
bytes_read = 0 | |
print("Starting log file ingestion...") | |
while True: | |
lines = logfile.readlines(1024 * 1024) | |
if not lines: break | |
chunk_bytes = sum(len(line.encode('utf-8')) for line in lines) | |
bytes_read += chunk_bytes | |
for line in lines: | |
action, size = parse_log_line(line) | |
if action and size is not None: | |
(uploads_hist if 'PUT' in action else downloads_hist)._raw_data.append(size) | |
percent_done = (bytes_read / total_size) * 100 | |
sys.stdout.write(f"\rIngesting log file... {percent_done:.1f}% done") | |
sys.stdout.flush() | |
sys.stdout.write("\nIngestion complete. Starting dashboard...\n") | |
except FileNotFoundError: | |
print(f"Error: Log file not found at {args.logfile}"); sys.exit(1) | |
except Exception as e: | |
print(f"\nAn error occurred during ingestion: {e}"); sys.exit(1) | |
try: | |
curses.wrapper(main, args, uploads_hist, downloads_hist) | |
except curses.error as e: | |
print(f"A terminal error occurred: {e}") | |
print("This can happen if the script exits unexpectedly or the terminal is too small.") | |
except Exception as e: | |
print(f"An unexpected application error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example usage: