Skip to content

Instantly share code, notes, and snippets.

@jmcarp
Created November 21, 2025 15:34
Show Gist options
  • Select an option

  • Save jmcarp/cd77ca786eae8a7a64c74264cb67ed41 to your computer and use it in GitHub Desktop.

Select an option

Save jmcarp/cd77ca786eae8a7a64c74264cb67ed41 to your computer and use it in GitHub Desktop.
Vibe-coded zonestat to victoriametrics script
#!/usr/bin/env python3
"""
Parse illumos zonestat output and convert to VictoriaMetrics JSON format.
Usage:
python parse_zonestat.py <input_file> [output_file]
If output_file is not specified, outputs to stdout.
"""
import sys
import re
import json
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List
from collections import defaultdict
def parse_timestamp(ts: str) -> int:
"""Convert ISO8601 timestamp to Unix milliseconds."""
# Format: 20251117T200010Z (UTC)
dt = datetime.strptime(ts, "%Y%m%dT%H%M%SZ")
# The timestamp is UTC but strptime treats it as naive/local
# We need to make it timezone-aware and convert to UTC timestamp
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
def parse_memory_value(mem_str: str) -> int:
"""Parse memory value like '1061141504K' to bytes."""
if mem_str == "-":
return 0
# Map suffix to multiplier
suffixes = {"K": 1024, "M": 1024**2, "G": 1024**3}
for suffix, multiplier in suffixes.items():
if mem_str.endswith(suffix):
return int(mem_str[:-1]) * multiplier
return int(mem_str)
def parse_percent(percent_str: str) -> float:
"""Parse percentage like '58.09%' to float."""
if percent_str == "-":
return 0.0
return float(percent_str.rstrip("%"))
def sanitize_label_value(value: str) -> str:
"""Sanitize label value to only include alphanumeric, hyphens, and underscores."""
if not value:
return ""
sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', value)
return sanitized.strip('_')
def parse_zonestat_file(file_path: str, sled_name: str = None) -> List[Dict]:
"""Parse a zonestat file and return list of metric records.
Args:
file_path: Path to the zonestat file
sled_name: Optional sled name to add as a label
"""
records = []
current_timestamp = None
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(":")
if len(parts) < 3:
continue
record_type = parts[1]
if record_type == "header":
# Format: interval:header:since-last-interval:TIMESTAMP:interval_num:total_seconds
if len(parts) >= 4:
current_timestamp = parse_timestamp(parts[3])
elif record_type == "summary" and current_timestamp:
# Format: interval:summary:ZONE_NAME:fields...
if len(parts) < 4:
continue
zone_name = parts[2]
# Skip resource lines - these just describe total resources available
if zone_name == "[resource]":
continue
fields = parts[3:]
# Format: cpu_usage:cpu_percent:-:-:physical_mem_used:physical_mem_percent:-:virtual_mem_used:virtual_mem_percent:-
if len(fields) >= 10:
# Parse field values
cpu_usage = float(fields[0]) if fields[0] != "-" else 0.0
cpu_percent = parse_percent(fields[1])
physical_mem_bytes = parse_memory_value(fields[4])
physical_mem_percent = parse_percent(fields[5])
virtual_mem_bytes = parse_memory_value(fields[7])
virtual_mem_percent = parse_percent(fields[8])
zone = sanitize_label_value(zone_name)
sled = sanitize_label_value(sled_name)
# Helper to create a metric record
def add_metric(metric_name: str, value: float):
records.append({
"timestamp": current_timestamp,
"metric_name": metric_name,
"zone": zone,
"sled": sled,
"value": value
})
# Add all metrics
add_metric("zonestat_cpu_usage", cpu_usage)
add_metric("zonestat_cpu_percent", cpu_percent)
add_metric("zonestat_physical_memory_bytes", physical_mem_bytes)
add_metric("zonestat_physical_memory_percent", physical_mem_percent)
add_metric("zonestat_virtual_memory_bytes", virtual_mem_bytes)
add_metric("zonestat_virtual_memory_percent", virtual_mem_percent)
return records
def write_json(records: List[Dict], output_file=None):
"""Write records to JSON file in VictoriaMetrics-compatible format.
Groups records by time series (metric_name + labels) and outputs in the format:
{
"metric": {"__name__": "metric_name", "label1": "value1", ...},
"values": [v1, v2, ...],
"timestamps": [t1, t2, ...]
}
Splits large time series into chunks to avoid exceeding VictoriaMetrics line length limit.
"""
# Group records by time series (metric_name, zone, sled)
time_series = defaultdict(lambda: {"timestamps": [], "values": []})
for record in records:
# Create a unique key for this time series
metric_name = record["metric_name"]
zone = record["zone"]
sled = record["sled"]
key = (metric_name, zone, sled)
# Add timestamp and value to this time series
time_series[key]["timestamps"].append(record["timestamp"])
time_series[key]["values"].append(record["value"])
# Convert to VictoriaMetrics JSON format, splitting into chunks if needed
# VictoriaMetrics has a 10MB line limit, so we chunk to ~1000 points per line
CHUNK_SIZE = 1000
def write_line(item):
line = json.dumps(item) + '\n'
if output_file:
f.write(line)
else:
sys.stdout.write(line)
if output_file:
f = open(output_file, 'w')
try:
for (metric_name, zone, sled), data in time_series.items():
metric_obj = {
"__name__": metric_name,
"zone": zone
}
# Only add sled label if it's not empty
if sled:
metric_obj["sled"] = sled
timestamps = data["timestamps"]
values = data["values"]
# Split into chunks
for i in range(0, len(timestamps), CHUNK_SIZE):
chunk_timestamps = timestamps[i:i + CHUNK_SIZE]
chunk_values = values[i:i + CHUNK_SIZE]
write_line({
"metric": metric_obj,
"values": chunk_values,
"timestamps": chunk_timestamps
})
finally:
if output_file:
f.close()
def main():
parser = argparse.ArgumentParser(
description='Parse illumos zonestat output and convert to VictoriaMetrics JSON format.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Output format:
VictoriaMetrics JSON import format with grouped time series:
{
"metric": {"__name__": "metric_name", "zone": "...", "sled": "..."},
"values": [v1, v2, ...],
"timestamps": [t1, t2, ...]
}
Examples:
%(prog)s zonestat/sled-10 output.json
%(prog)s zonestat/sled-10 --sled-name my-sled > output.json
Import to VictoriaMetrics:
# From file
curl -X POST 'http://localhost:8428/api/v1/import' --data-binary @output.json
# Or pipe directly (recommended)
%(prog)s zonestat/sled-10 | curl -X POST 'http://localhost:8428/api/v1/import' --data-binary @-
'''
)
parser.add_argument('input_file',
help='Path to zonestat output file')
parser.add_argument('output_file',
nargs='?',
default=None,
help='Path to output JSON file (default: stdout)')
parser.add_argument('--sled-name',
default=None,
help='Add sled name as a label (default: auto-detect from filename)')
args = parser.parse_args()
# Validate input file exists
if not Path(args.input_file).exists():
print(f"Error: Input file '{args.input_file}' not found", file=sys.stderr)
sys.exit(1)
# Auto-detect sled name from filename if not specified
sled_name = args.sled_name
if sled_name is None:
filename = Path(args.input_file).stem
if filename.startswith("sled-") or filename.startswith("sled_"):
sled_name = filename
# Parse the file
records = parse_zonestat_file(args.input_file, sled_name)
# Write to JSON
write_json(records, args.output_file)
if args.output_file:
print(f"Successfully parsed {len(records)} records to {args.output_file}", file=sys.stderr)
else:
print(f"Successfully parsed {len(records)} records", file=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment