Last active
May 1, 2025 09:35
-
-
Save 0xdeafbeef/34495f812657d3611d2dc5b212492fe2 to your computer and use it in GitHub Desktop.
gathers compactions stats per cf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import json | |
import sys | |
import os | |
from collections import defaultdict | |
import logging | |
logging.basicConfig(level=logging.WARNING, format='%(levelname)s:%(message)s') | |
def parse_size_unit(value_str): | |
"""Parses a string like '138.78 MB', '8.82 GB', '2.93KB', '503.6' into GB.""" | |
value_str = str(value_str).strip() | |
if value_str in ['', '-']: return 0.0 | |
m = re.match(r'([\d.]+)\s*(KB|MB|GB|TB|B)\b', value_str, re.IGNORECASE) | |
if m: | |
value = float(m.group(1)) | |
unit = m.group(2).upper() | |
if unit == 'KB': | |
return value / (1024**2) | |
elif unit == 'MB': | |
return value / 1024 | |
elif unit == 'GB': | |
return value | |
elif unit == 'TB': | |
return value * 1024 | |
elif unit == 'B': | |
return value / (1024**3) | |
else: | |
try: | |
val = float(value_str) | |
return val | |
except ValueError: | |
if re.match(r'^\d+(\.\d+)?[KMGT]$', value_str, re.IGNORECASE): | |
return 0.0 | |
else: | |
logging.warning(f"Could not parse size unit from unrecognized value: '{value_str}'") | |
return 0.0 | |
def parse_human_readable_count(value_str): | |
"""Parses a string like '412K', '961M', '10G', '138', '9326K' into an integer.""" | |
value_str = str(value_str).strip().upper() | |
if not value_str or value_str == '-' or value_str == 'N/A': | |
return 0 | |
m = re.match(r'([\d.]+)([KMGT]?)$', value_str) | |
if not m: | |
try: | |
return int(float(value_str)) | |
except ValueError: | |
logging.warning(f"Could not parse count from non-numeric value: '{value_str}'") | |
return 0 | |
value = float(m.group(1)) | |
suffix = m.group(2) | |
if suffix == 'K': | |
value *= 1000 | |
elif suffix == 'M': | |
value *= 1000**2 | |
elif suffix == 'G': | |
value *= 1000**3 | |
elif suffix == 'T': | |
value *= 1000**4 | |
return int(value) | |
def try_float(value): | |
"""Attempt to convert value to float, return 0.0 on failure.""" | |
try: | |
val_str = str(value).strip() | |
if val_str == '-': return 0.0 | |
return float(val_str) | |
except (ValueError, TypeError): | |
return 0.0 | |
def try_int(value): | |
"""Attempt to convert value to int, return 0 on failure.""" | |
try: | |
val_str = str(value).strip() | |
if val_str == '-': return 0 | |
return int(float(val_str)) | |
except (ValueError, TypeError): | |
return 0 | |
def merge_dicts(dicts): | |
"Merges a list of dicts into one, later keys overwrite earlier ones." | |
result = {} | |
for d in dicts: | |
result.update(d) | |
return result | |
def parse_compaction_stats(lines): | |
cf_name = None | |
current = {} | |
cfs_blocks = [] | |
parsing_table_type = None | |
table_headers = [] | |
raw_header_line = "" | |
LEVEL_HEADERS_DEF = [ | |
('level', (r'Level',)), | |
('files', (r'Files',)), | |
('size_gb', (r'Size',)), | |
('score', (r'Score',)), | |
('read_gb', (r'Read\(GB\)',)), | |
('read_next_gb', (r'Rn\(GB\)',)), | |
('read_next_p1_gb', (r'Rnp1\(GB\)',)), | |
('write_gb', (r'Write\(GB\)',)), | |
('write_new_gb', (r'Wnew\(GB\)',)), | |
('moved_gb', (r'Moved\(GB\)',)), | |
('write_amp', (r'W-Amp',)), | |
('read_mbps', (r'Rd\(MB/s\)',)), | |
('write_mbps', (r'Wr\(MB/s\)',)), | |
('comp_sec', (r'Comp\(sec\)', r'Comp\(s\s*ec\)')), | |
('comp_cpu_sec', (r'CompMergeCPU\(sec\)',)), | |
('comp_cnt', (r'Comp\(cnt\)',)), | |
('avg_sec', (r'Avg\(sec\)',)), | |
('key_in_count', (r'KeyIn',)), | |
('key_drop_count', (r'KeyDrop',)), | |
('read_blob_gb', (r'Rblob\(GB\)',)), | |
('write_blob_gb', (r'Wblob\(GB\)',)), | |
] | |
PRIORITY_HEADERS_DEF = [ | |
('priority', (r'Priority',)), | |
('files', (r'Files',)), | |
('size_gb', (r'Size',)), | |
('score', (r'Score',)), | |
('read_gb', (r'Read\(GB\)',)), | |
('read_next_gb', (r'Rn\(GB\)',)), | |
('read_next_p1_gb', (r'Rnp1\(GB\)',)), | |
('write_gb', (r'Write\(GB\)',)), | |
('write_new_gb', (r'Wnew\(GB\)',)), | |
('moved_gb', (r'Moved\(GB\)',)), | |
('write_amp', (r'W-Amp',)), | |
('read_mbps', (r'Rd\(MB/s\)',)), | |
('write_mbps', (r'Wr\(MB/s\)',)), | |
('comp_sec', (r'Comp\(sec\)', r'Comp\(s\s*ec\)')), | |
('comp_cpu_sec', (r'CompMergeCPU\(sec\)',)), | |
('comp_cnt', (r'Comp\(cnt\)',)), | |
('avg_sec', (r'Avg\(sec\)',)), | |
('key_in_count', (r'KeyIn',)), | |
('key_drop_count', (r'KeyDrop',)), | |
('read_blob_gb', (r'Rblob\(GB\)',)), | |
('write_blob_gb', (r'Wblob\(GB\)',)), | |
] | |
cf_re = re.compile(r'\*\* Compaction Stats \[(.*?)\] \*\*') | |
separator_re = re.compile(r'^-+$') | |
level_header_start_re = re.compile(r'^\s*Level\s+Files\s+Size.*') | |
priority_header_start_re = re.compile(r'^\s*Priority\s+Files\s+Size.*') | |
blob_stats_re = re.compile( | |
r'Blob file count: (\d+), total size: ([\d.]+) GB, garbage size: ([\d.]+) GB, space amp: ([\d.]+)' | |
) | |
cumulative_re = re.compile( | |
r'Cumulative compaction: ([\d.]+) GB write, ([\d.]+) MB/s write, ([\d.]+) GB read, ([\d.]+) MB/s read, ([\d.]+) seconds' | |
) | |
interval_re = re.compile( | |
r'Interval compaction: ([\d.]+) GB write, ([\d.]+) MB/s write, ([\d.]+) GB read, ([\d.]+) MB/s read, ([\d.]+) seconds' | |
) | |
pending_compaction_re = re.compile(r'Estimated pending compaction bytes: (\d+)') | |
write_stall_re = re.compile(r'Write Stall \(count\): (.*)') | |
data_row_identifier_re = re.compile(r'^\s*(L\d+|Sum|Int|Low|High)\s+') | |
for line_num, line in enumerate(lines): | |
line = line.strip() | |
if not line: | |
continue | |
cf_match = cf_re.search(line) | |
separator_match = separator_re.match(line) | |
level_header_match = level_header_start_re.match(line) | |
priority_header_match = priority_header_start_re.match(line) | |
data_row_match = data_row_identifier_re.match(line) | |
if cf_match: | |
if current: cfs_blocks.append(current) | |
cf_name = cf_match.group(1) | |
current = {"cf": cf_name} | |
parsing_table_type = None | |
table_headers = [] | |
raw_header_line = "" | |
logging.debug(f"Line {line_num+1}: Starting new CF block for '{cf_name}'") | |
continue | |
if level_header_match and not parsing_table_type and current: | |
parsing_table_type = 'level' | |
table_headers = LEVEL_HEADERS_DEF | |
raw_header_line = line | |
logging.debug(f"Line {line_num+1}: Detected Level header for CF '{cf_name}'") | |
continue | |
elif priority_header_match and not parsing_table_type and current: | |
parsing_table_type = 'priority' | |
table_headers = PRIORITY_HEADERS_DEF | |
raw_header_line = line | |
logging.debug(f"Line {line_num+1}: Detected Priority header for CF '{cf_name}'") | |
continue | |
if separator_match and parsing_table_type and table_headers and current: | |
logging.debug(f"Line {line_num+1}: Detected separator for {parsing_table_type} table.") | |
continue | |
if parsing_table_type and table_headers and data_row_match and current: | |
parts = line.split() | |
row_data = {} | |
row_id_part = parts[0] | |
try: | |
for i, (key, header_patterns) in enumerate(table_headers): | |
if i < len(parts): | |
value = parts[i] | |
if key == 'size_gb': | |
row_data[key] = parse_size_unit(value) | |
elif key.endswith('_gb'): | |
row_data[key] = parse_size_unit(value) | |
elif key.endswith('_mbps') or key.endswith('_sec') or key == 'score' or key == 'write_amp': | |
row_data[key] = try_float(value) | |
elif key.endswith('_count'): | |
row_data[key] = parse_human_readable_count(value) | |
elif key == 'files': | |
row_data[key] = value | |
elif key == 'level' or key == 'priority': | |
row_data[key] = value | |
else: | |
row_data[key] = try_float(value) if re.match(r'^[\d.-]+$', value) else value | |
else: | |
logging.debug(f"Line {line_num+1}: Missing data for column '{key}' (index {i}) in row '{row_id_part}' for CF '{cf_name}'. Assigning default.") | |
if key.endswith('_gb') or key.endswith('_mbps') or key.endswith('_sec') or key == 'score' or key == 'write_amp': | |
row_data[key] = 0.0 | |
elif key.endswith('_count'): | |
row_data[key] = 0 | |
elif key == 'files': | |
row_data[key] = "0/0" | |
else: | |
row_data[key] = None | |
table_key = f"{parsing_table_type}_stats" | |
if table_key not in current: current[table_key] = {} | |
row_identifier = row_data.get(parsing_table_type) | |
if row_identifier: | |
current[table_key][row_identifier] = row_data | |
logging.debug(f"Line {line_num+1}: Parsed {parsing_table_type} row '{row_identifier}' for CF '{cf_name}' (found {len(parts)} parts, expected <= {len(table_headers)})") | |
else: | |
logging.warning(f"Line {line_num+1}: Could not determine row identifier ('{parsing_table_type}') for parsed data row in CF '{cf_name}'. Row data: {row_data}") | |
except Exception as e: | |
logging.error(f"Line {line_num+1}: Failed during data row processing for CF '{cf_name}'. Error: {type(e).__name__} - {e}. Line: '{line}'. Parts: {parts}. Headers: {table_headers}") | |
continue | |
elif parsing_table_type and current: | |
is_known_summary = (blob_stats_re.search(line) or | |
cumulative_re.search(line) or | |
interval_re.search(line) or | |
pending_compaction_re.search(line) or | |
write_stall_re.search(line)) | |
is_new_cf = cf_re.search(line) | |
if not data_row_match and not is_known_summary and not is_new_cf: | |
logging.debug(f"Line {line_num+1}: Line does not look like data, known summary, or new CF start. Ending {parsing_table_type} table parse for CF '{cf_name}'. Line: '{line}'") | |
parsing_table_type = None | |
table_headers = [] | |
raw_header_line = "" | |
if current is not None: | |
blob_m = blob_stats_re.search(line) | |
if blob_m: | |
blob_file_stats = { | |
"blob_file_count": try_int(blob_m.group(1)), | |
"total_size_gb": try_float(blob_m.group(2)), | |
"garbage_size_gb": try_float(blob_m.group(3)), | |
"space_amp": try_float(blob_m.group(4)), | |
} | |
current.update({"blob_file_stats": blob_file_stats}) | |
logging.debug(f"Line {line_num+1}: Parsed blob stats for CF '{cf_name}'") | |
continue | |
m = cumulative_re.search(line) | |
if m: | |
cumulative = { | |
"write_gb": try_float(m.group(1)), | |
"write_mb_s": try_float(m.group(2)), | |
"read_gb": try_float(m.group(3)), | |
"read_mb_s": try_float(m.group(4)), | |
"seconds": try_float(m.group(5)), | |
} | |
current.update({"cumulative_compaction": cumulative}) | |
logging.debug(f"Line {line_num+1}: Parsed cumulative compaction for CF '{cf_name}'") | |
continue | |
m = interval_re.search(line) | |
if m: | |
interval = { | |
"write_gb": try_float(m.group(1)), | |
"write_mb_s": try_float(m.group(2)), | |
"read_gb": try_float(m.group(3)), | |
"read_mb_s": try_float(m.group(4)), | |
"seconds": try_float(m.group(5)), | |
} | |
current.update({"interval_compaction": interval}) | |
logging.debug(f"Line {line_num+1}: Parsed interval compaction for CF '{cf_name}'") | |
continue | |
m = pending_compaction_re.search(line) | |
if m: | |
current["pending_compaction_bytes"] = try_int(m.group(1)) | |
logging.debug(f"Line {line_num+1}: Parsed pending compaction for CF '{cf_name}'") | |
continue | |
m = write_stall_re.search(line) | |
if m: | |
stalls = {} | |
stall_str = m.group(1).strip() | |
pairs = [p for p in stall_str.split(',') if ':' in p] | |
for pair in pairs: | |
try: | |
k, v = pair.strip().split(':', 1) | |
stalls[k.strip()] = try_int(v.strip()) | |
except ValueError: | |
logging.warning(f"Line {line_num+1}: Could not parse write stall pair: '{pair}' in line: '{line}'") | |
current["write_stalls"] = stalls | |
logging.debug(f"Line {line_num+1}: Parsed write stalls for CF '{cf_name}'") | |
continue | |
if current: cfs_blocks.append(current) | |
by_cf = defaultdict(list) | |
cf_none_list = [] | |
for block in cfs_blocks: | |
cf = block.get("cf") | |
if cf is not None: by_cf[cf].append(block) | |
else: cf_none_list.append(block) | |
merged = [] | |
for cf, dicts in by_cf.items(): | |
merged.append(merge_dicts(dicts)) | |
merged.extend(cf_none_list) | |
return merged | |
def find_log_files(root): | |
"""Finds files starting with 'LOG' in the given directory and subdirectories.""" | |
logs = [] | |
for dirpath, dirnames, filenames in os.walk(root): | |
for name in filenames: | |
if name.startswith("LOG"): | |
logs.append(os.path.join(dirpath, name)) | |
return logs | |
def main(): | |
if len(sys.argv) != 2: | |
print(f"Usage: {sys.argv[0]} <path_to_log_file_or_directory>", file=sys.stderr) | |
sys.exit(1) | |
path_arg = sys.argv[1] | |
all_stats = {} | |
log_files_to_process = [] | |
if os.path.isfile(path_arg): | |
if os.path.basename(path_arg).startswith("LOG"): | |
log_files_to_process.append(path_arg) | |
else: | |
print(f"Error: Specified file '{path_arg}' does not look like a RocksDB LOG file (name must start with LOG).", file=sys.stderr) | |
sys.exit(1) | |
elif os.path.isdir(path_arg): | |
log_files_to_process = find_log_files(path_arg) | |
if not log_files_to_process: | |
print(f"Warning: No LOG files found in directory '{path_arg}'.", file=sys.stderr) | |
else: | |
print(f"Error: Path '{path_arg}' is not a valid file or directory.", file=sys.stderr) | |
sys.exit(1) | |
if log_files_to_process: | |
print(f"Found {len(log_files_to_process)} log files to process.", file=sys.stderr) | |
else: | |
print(f"No log files found matching criteria.", file=sys.stderr) | |
for logf in log_files_to_process: | |
print(f"Processing: {logf}", file=sys.stderr) | |
try: | |
with open(logf, 'r', errors='replace') as f: | |
lines = f.readlines() | |
stats_list = parse_compaction_stats(lines) | |
if stats_list: | |
all_stats[logf] = stats_list | |
else: | |
print(f"Info: No compaction stats blocks found or parsed in {logf}", file=sys.stderr) | |
except FileNotFoundError: | |
print(f"Error: File not found {logf}", file=sys.stderr) | |
continue | |
except Exception as e: | |
print(f"Error processing {logf}: {type(e).__name__} - {e}", file=sys.stderr) | |
continue | |
try: | |
print(json.dumps(all_stats, indent=2)) | |
except Exception as e: | |
print(f"\nError converting results to JSON: {e}", file=sys.stderr) | |
print("Dumping raw structure (might be large) to stderr:", file=sys.stderr) | |
import pprint | |
pprint.pprint(all_stats, stream=sys.stderr) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment