Skip to content

Instantly share code, notes, and snippets.

@0xdeafbeef
Last active May 1, 2025 09:35
Show Gist options
  • Save 0xdeafbeef/34495f812657d3611d2dc5b212492fe2 to your computer and use it in GitHub Desktop.
Save 0xdeafbeef/34495f812657d3611d2dc5b212492fe2 to your computer and use it in GitHub Desktop.
gathers compactions stats per cf
import re
import json
import sys
import os
from collections import defaultdict
import logging
logging.basicConfig(level=logging.WARNING, format='%(levelname)s:%(message)s')
def parse_size_unit(value_str):
"""Parses a string like '138.78 MB', '8.82 GB', '2.93KB', '503.6' into GB."""
value_str = str(value_str).strip()
if value_str in ['', '-']: return 0.0
m = re.match(r'([\d.]+)\s*(KB|MB|GB|TB|B)\b', value_str, re.IGNORECASE)
if m:
value = float(m.group(1))
unit = m.group(2).upper()
if unit == 'KB':
return value / (1024**2)
elif unit == 'MB':
return value / 1024
elif unit == 'GB':
return value
elif unit == 'TB':
return value * 1024
elif unit == 'B':
return value / (1024**3)
else:
try:
val = float(value_str)
return val
except ValueError:
if re.match(r'^\d+(\.\d+)?[KMGT]$', value_str, re.IGNORECASE):
return 0.0
else:
logging.warning(f"Could not parse size unit from unrecognized value: '{value_str}'")
return 0.0
def parse_human_readable_count(value_str):
"""Parses a string like '412K', '961M', '10G', '138', '9326K' into an integer."""
value_str = str(value_str).strip().upper()
if not value_str or value_str == '-' or value_str == 'N/A':
return 0
m = re.match(r'([\d.]+)([KMGT]?)$', value_str)
if not m:
try:
return int(float(value_str))
except ValueError:
logging.warning(f"Could not parse count from non-numeric value: '{value_str}'")
return 0
value = float(m.group(1))
suffix = m.group(2)
if suffix == 'K':
value *= 1000
elif suffix == 'M':
value *= 1000**2
elif suffix == 'G':
value *= 1000**3
elif suffix == 'T':
value *= 1000**4
return int(value)
def try_float(value):
"""Attempt to convert value to float, return 0.0 on failure."""
try:
val_str = str(value).strip()
if val_str == '-': return 0.0
return float(val_str)
except (ValueError, TypeError):
return 0.0
def try_int(value):
"""Attempt to convert value to int, return 0 on failure."""
try:
val_str = str(value).strip()
if val_str == '-': return 0
return int(float(val_str))
except (ValueError, TypeError):
return 0
def merge_dicts(dicts):
"Merges a list of dicts into one, later keys overwrite earlier ones."
result = {}
for d in dicts:
result.update(d)
return result
def parse_compaction_stats(lines):
cf_name = None
current = {}
cfs_blocks = []
parsing_table_type = None
table_headers = []
raw_header_line = ""
LEVEL_HEADERS_DEF = [
('level', (r'Level',)),
('files', (r'Files',)),
('size_gb', (r'Size',)),
('score', (r'Score',)),
('read_gb', (r'Read\(GB\)',)),
('read_next_gb', (r'Rn\(GB\)',)),
('read_next_p1_gb', (r'Rnp1\(GB\)',)),
('write_gb', (r'Write\(GB\)',)),
('write_new_gb', (r'Wnew\(GB\)',)),
('moved_gb', (r'Moved\(GB\)',)),
('write_amp', (r'W-Amp',)),
('read_mbps', (r'Rd\(MB/s\)',)),
('write_mbps', (r'Wr\(MB/s\)',)),
('comp_sec', (r'Comp\(sec\)', r'Comp\(s\s*ec\)')),
('comp_cpu_sec', (r'CompMergeCPU\(sec\)',)),
('comp_cnt', (r'Comp\(cnt\)',)),
('avg_sec', (r'Avg\(sec\)',)),
('key_in_count', (r'KeyIn',)),
('key_drop_count', (r'KeyDrop',)),
('read_blob_gb', (r'Rblob\(GB\)',)),
('write_blob_gb', (r'Wblob\(GB\)',)),
]
PRIORITY_HEADERS_DEF = [
('priority', (r'Priority',)),
('files', (r'Files',)),
('size_gb', (r'Size',)),
('score', (r'Score',)),
('read_gb', (r'Read\(GB\)',)),
('read_next_gb', (r'Rn\(GB\)',)),
('read_next_p1_gb', (r'Rnp1\(GB\)',)),
('write_gb', (r'Write\(GB\)',)),
('write_new_gb', (r'Wnew\(GB\)',)),
('moved_gb', (r'Moved\(GB\)',)),
('write_amp', (r'W-Amp',)),
('read_mbps', (r'Rd\(MB/s\)',)),
('write_mbps', (r'Wr\(MB/s\)',)),
('comp_sec', (r'Comp\(sec\)', r'Comp\(s\s*ec\)')),
('comp_cpu_sec', (r'CompMergeCPU\(sec\)',)),
('comp_cnt', (r'Comp\(cnt\)',)),
('avg_sec', (r'Avg\(sec\)',)),
('key_in_count', (r'KeyIn',)),
('key_drop_count', (r'KeyDrop',)),
('read_blob_gb', (r'Rblob\(GB\)',)),
('write_blob_gb', (r'Wblob\(GB\)',)),
]
cf_re = re.compile(r'\*\* Compaction Stats \[(.*?)\] \*\*')
separator_re = re.compile(r'^-+$')
level_header_start_re = re.compile(r'^\s*Level\s+Files\s+Size.*')
priority_header_start_re = re.compile(r'^\s*Priority\s+Files\s+Size.*')
blob_stats_re = re.compile(
r'Blob file count: (\d+), total size: ([\d.]+) GB, garbage size: ([\d.]+) GB, space amp: ([\d.]+)'
)
cumulative_re = re.compile(
r'Cumulative compaction: ([\d.]+) GB write, ([\d.]+) MB/s write, ([\d.]+) GB read, ([\d.]+) MB/s read, ([\d.]+) seconds'
)
interval_re = re.compile(
r'Interval compaction: ([\d.]+) GB write, ([\d.]+) MB/s write, ([\d.]+) GB read, ([\d.]+) MB/s read, ([\d.]+) seconds'
)
pending_compaction_re = re.compile(r'Estimated pending compaction bytes: (\d+)')
write_stall_re = re.compile(r'Write Stall \(count\): (.*)')
data_row_identifier_re = re.compile(r'^\s*(L\d+|Sum|Int|Low|High)\s+')
for line_num, line in enumerate(lines):
line = line.strip()
if not line:
continue
cf_match = cf_re.search(line)
separator_match = separator_re.match(line)
level_header_match = level_header_start_re.match(line)
priority_header_match = priority_header_start_re.match(line)
data_row_match = data_row_identifier_re.match(line)
if cf_match:
if current: cfs_blocks.append(current)
cf_name = cf_match.group(1)
current = {"cf": cf_name}
parsing_table_type = None
table_headers = []
raw_header_line = ""
logging.debug(f"Line {line_num+1}: Starting new CF block for '{cf_name}'")
continue
if level_header_match and not parsing_table_type and current:
parsing_table_type = 'level'
table_headers = LEVEL_HEADERS_DEF
raw_header_line = line
logging.debug(f"Line {line_num+1}: Detected Level header for CF '{cf_name}'")
continue
elif priority_header_match and not parsing_table_type and current:
parsing_table_type = 'priority'
table_headers = PRIORITY_HEADERS_DEF
raw_header_line = line
logging.debug(f"Line {line_num+1}: Detected Priority header for CF '{cf_name}'")
continue
if separator_match and parsing_table_type and table_headers and current:
logging.debug(f"Line {line_num+1}: Detected separator for {parsing_table_type} table.")
continue
if parsing_table_type and table_headers and data_row_match and current:
parts = line.split()
row_data = {}
row_id_part = parts[0]
try:
for i, (key, header_patterns) in enumerate(table_headers):
if i < len(parts):
value = parts[i]
if key == 'size_gb':
row_data[key] = parse_size_unit(value)
elif key.endswith('_gb'):
row_data[key] = parse_size_unit(value)
elif key.endswith('_mbps') or key.endswith('_sec') or key == 'score' or key == 'write_amp':
row_data[key] = try_float(value)
elif key.endswith('_count'):
row_data[key] = parse_human_readable_count(value)
elif key == 'files':
row_data[key] = value
elif key == 'level' or key == 'priority':
row_data[key] = value
else:
row_data[key] = try_float(value) if re.match(r'^[\d.-]+$', value) else value
else:
logging.debug(f"Line {line_num+1}: Missing data for column '{key}' (index {i}) in row '{row_id_part}' for CF '{cf_name}'. Assigning default.")
if key.endswith('_gb') or key.endswith('_mbps') or key.endswith('_sec') or key == 'score' or key == 'write_amp':
row_data[key] = 0.0
elif key.endswith('_count'):
row_data[key] = 0
elif key == 'files':
row_data[key] = "0/0"
else:
row_data[key] = None
table_key = f"{parsing_table_type}_stats"
if table_key not in current: current[table_key] = {}
row_identifier = row_data.get(parsing_table_type)
if row_identifier:
current[table_key][row_identifier] = row_data
logging.debug(f"Line {line_num+1}: Parsed {parsing_table_type} row '{row_identifier}' for CF '{cf_name}' (found {len(parts)} parts, expected <= {len(table_headers)})")
else:
logging.warning(f"Line {line_num+1}: Could not determine row identifier ('{parsing_table_type}') for parsed data row in CF '{cf_name}'. Row data: {row_data}")
except Exception as e:
logging.error(f"Line {line_num+1}: Failed during data row processing for CF '{cf_name}'. Error: {type(e).__name__} - {e}. Line: '{line}'. Parts: {parts}. Headers: {table_headers}")
continue
elif parsing_table_type and current:
is_known_summary = (blob_stats_re.search(line) or
cumulative_re.search(line) or
interval_re.search(line) or
pending_compaction_re.search(line) or
write_stall_re.search(line))
is_new_cf = cf_re.search(line)
if not data_row_match and not is_known_summary and not is_new_cf:
logging.debug(f"Line {line_num+1}: Line does not look like data, known summary, or new CF start. Ending {parsing_table_type} table parse for CF '{cf_name}'. Line: '{line}'")
parsing_table_type = None
table_headers = []
raw_header_line = ""
if current is not None:
blob_m = blob_stats_re.search(line)
if blob_m:
blob_file_stats = {
"blob_file_count": try_int(blob_m.group(1)),
"total_size_gb": try_float(blob_m.group(2)),
"garbage_size_gb": try_float(blob_m.group(3)),
"space_amp": try_float(blob_m.group(4)),
}
current.update({"blob_file_stats": blob_file_stats})
logging.debug(f"Line {line_num+1}: Parsed blob stats for CF '{cf_name}'")
continue
m = cumulative_re.search(line)
if m:
cumulative = {
"write_gb": try_float(m.group(1)),
"write_mb_s": try_float(m.group(2)),
"read_gb": try_float(m.group(3)),
"read_mb_s": try_float(m.group(4)),
"seconds": try_float(m.group(5)),
}
current.update({"cumulative_compaction": cumulative})
logging.debug(f"Line {line_num+1}: Parsed cumulative compaction for CF '{cf_name}'")
continue
m = interval_re.search(line)
if m:
interval = {
"write_gb": try_float(m.group(1)),
"write_mb_s": try_float(m.group(2)),
"read_gb": try_float(m.group(3)),
"read_mb_s": try_float(m.group(4)),
"seconds": try_float(m.group(5)),
}
current.update({"interval_compaction": interval})
logging.debug(f"Line {line_num+1}: Parsed interval compaction for CF '{cf_name}'")
continue
m = pending_compaction_re.search(line)
if m:
current["pending_compaction_bytes"] = try_int(m.group(1))
logging.debug(f"Line {line_num+1}: Parsed pending compaction for CF '{cf_name}'")
continue
m = write_stall_re.search(line)
if m:
stalls = {}
stall_str = m.group(1).strip()
pairs = [p for p in stall_str.split(',') if ':' in p]
for pair in pairs:
try:
k, v = pair.strip().split(':', 1)
stalls[k.strip()] = try_int(v.strip())
except ValueError:
logging.warning(f"Line {line_num+1}: Could not parse write stall pair: '{pair}' in line: '{line}'")
current["write_stalls"] = stalls
logging.debug(f"Line {line_num+1}: Parsed write stalls for CF '{cf_name}'")
continue
if current: cfs_blocks.append(current)
by_cf = defaultdict(list)
cf_none_list = []
for block in cfs_blocks:
cf = block.get("cf")
if cf is not None: by_cf[cf].append(block)
else: cf_none_list.append(block)
merged = []
for cf, dicts in by_cf.items():
merged.append(merge_dicts(dicts))
merged.extend(cf_none_list)
return merged
def find_log_files(root):
"""Finds files starting with 'LOG' in the given directory and subdirectories."""
logs = []
for dirpath, dirnames, filenames in os.walk(root):
for name in filenames:
if name.startswith("LOG"):
logs.append(os.path.join(dirpath, name))
return logs
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_log_file_or_directory>", file=sys.stderr)
sys.exit(1)
path_arg = sys.argv[1]
all_stats = {}
log_files_to_process = []
if os.path.isfile(path_arg):
if os.path.basename(path_arg).startswith("LOG"):
log_files_to_process.append(path_arg)
else:
print(f"Error: Specified file '{path_arg}' does not look like a RocksDB LOG file (name must start with LOG).", file=sys.stderr)
sys.exit(1)
elif os.path.isdir(path_arg):
log_files_to_process = find_log_files(path_arg)
if not log_files_to_process:
print(f"Warning: No LOG files found in directory '{path_arg}'.", file=sys.stderr)
else:
print(f"Error: Path '{path_arg}' is not a valid file or directory.", file=sys.stderr)
sys.exit(1)
if log_files_to_process:
print(f"Found {len(log_files_to_process)} log files to process.", file=sys.stderr)
else:
print(f"No log files found matching criteria.", file=sys.stderr)
for logf in log_files_to_process:
print(f"Processing: {logf}", file=sys.stderr)
try:
with open(logf, 'r', errors='replace') as f:
lines = f.readlines()
stats_list = parse_compaction_stats(lines)
if stats_list:
all_stats[logf] = stats_list
else:
print(f"Info: No compaction stats blocks found or parsed in {logf}", file=sys.stderr)
except FileNotFoundError:
print(f"Error: File not found {logf}", file=sys.stderr)
continue
except Exception as e:
print(f"Error processing {logf}: {type(e).__name__} - {e}", file=sys.stderr)
continue
try:
print(json.dumps(all_stats, indent=2))
except Exception as e:
print(f"\nError converting results to JSON: {e}", file=sys.stderr)
print("Dumping raw structure (might be large) to stderr:", file=sys.stderr)
import pprint
pprint.pprint(all_stats, stream=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment