Skip to content

Instantly share code, notes, and snippets.

@UBarney
Last active July 30, 2025 14:14
Show Gist options
  • Save UBarney/7416e95841fc9c29b114c052ebf56a16 to your computer and use it in GitHub Desktop.
Save UBarney/7416e95841fc9c29b114c052ebf56a16 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# example usage:
# python3 get_mem_usage.py ./target/release/dfbench nlj
# ....
# Query Time (ms) Peak RSS
# --------------------------------
# 1 230 83.3 MB
# 2 310 362.1 MB
# 3 550 1.4 GB
# 4 960 2.8 GB
# 5 580 127.4 MB
# 6 4440 91.9 MB
# 7 610 128.5 MB
# 8 4490 90.1 MB
# 9 710 97.7 MB
# 10 2050 12.4 GB
# python3 get_mem_usage.py ./target/release/dfbench tpch --path /home/lv/code/datafusion/benchmarks/data/tpch_sf10 --prefer_hash_join true --format parquet -o /dev/null
# ....
# Query Time (ms) Peak RSS
# --------------------------------
# 1 1250 809.4 MB
# 2 280 713.6 MB
# 3 590 1.0 GB
# 4 510 2.3 GB
# 5 940 1.3 GB
# 6 200 699.7 MB
# 7 1550 3.7 GB
# 8 960 1.6 GB
# 9 1710 2.6 GB
# 10 740 1.4 GB
# 11 250 537.8 MB
# 12 300 649.1 MB
# 13 630 1.6 GB
# 14 280 1.1 GB
# 15 520 1.2 GB
# 16 200 681.3 MB
# 17 1530 1.4 GB
# 18 2930 3.5 GB
# 19 520 1006.5 MB
# 20 630 1.4 GB
# 21 2000 1.9 GB
# 22 160 518.1 MB
import sys
import subprocess
import argparse
import re
import os
def modify_iterations(args):
"""Modify --iterations parameter to 1 if it exists in the arguments"""
modified_args = []
i = 0
modified=False
while i < len(args):
if args[i] == '--iterations':
modified_args.append(args[i])
if i + 1 < len(args):
modified_args.append('1') # Force iterations to 1
i += 2
else:
modified_args.append('1')
i += 1
modified=True
elif args[i].startswith('--iterations='):
modified_args.append('--iterations=1')
i += 1
modified=True
else:
modified_args.append(args[i])
i += 1
if not modified:
modified_args.append('--iterations=1')
return modified_args
def format_memory(kb):
"""Format memory size in human readable format"""
if kb < 1024:
return f"{kb} KB"
elif kb < 1024 * 1024:
return f"{kb / 1024:.1f} MB"
else:
return f"{kb / 1024 / 1024:.1f} GB"
def run_query_with_memory_tracking(cmd_args, query_num):
"""Run a specific query with memory tracking using /usr/bin/time -v"""
# Add query specification to the command
cmd_with_query = cmd_args + ['-q', str(query_num)]
# Prepare the full command with time
time_cmd = ['/usr/bin/time', '-v'] + cmd_with_query
# Print the command that will be executed
print(f"Executing: {' '.join(time_cmd)}")
try:
# Run the command and capture output
result = subprocess.run(
time_cmd,
capture_output=True,
text=True,
check=False
)
# Check if the query was invalid (can happen even with exit code 0)
output_text = (result.stdout + result.stderr).lower()
if 'invalid query' in output_text:
print(f"Error running query {query_num}: invalid query")
return None
# Check if the command failed with non-zero exit code
if result.returncode != 0:
# Print error output
print(f"Error running query {query_num} (exit code {result.returncode}):")
if result.stdout:
print(f"STDOUT: {result.stdout}")
if result.stderr:
# Filter out time command output to show only the actual error
stderr_lines = result.stderr.split('\n')
time_output_started = False
error_lines = []
for line in stderr_lines:
# Time output typically starts with lines containing metrics
if any(keyword in line for keyword in ['Command being timed:', 'User time', 'System time', 'Percent of CPU']):
time_output_started = True
continue
if not time_output_started and line.strip():
error_lines.append(line)
if error_lines:
print(f"STDERR: {chr(10).join(error_lines)}")
return None # Return None for failed queries
# Extract memory information from time output
time_output = result.stderr
# Parse memory usage and elapsed time from time output
max_memory_kb = None
elapsed_time_seconds = None
for line in time_output.split('\n'):
if 'Maximum resident set size' in line:
# Extract memory in KB
match = re.search(r'Maximum resident set size \(kbytes\): (\d+)', line)
if match:
max_memory_kb = int(match.group(1))
elif 'Elapsed (wall clock) time' in line:
# Extract elapsed time in various formats
match = re.search(r'Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (.+)', line)
if match:
time_str = match.group(1).strip()
# Parse time format (could be h:mm:ss.ss or m:ss.ss or s.ss)
if ':' in time_str:
time_parts = time_str.split(':')
if len(time_parts) == 3: # h:mm:ss format
hours, minutes, seconds = time_parts
elapsed_time_seconds = int(hours) * 3600 + int(minutes) * 60 + float(seconds)
elif len(time_parts) == 2: # m:ss format
minutes, seconds = time_parts
elapsed_time_seconds = int(minutes) * 60 + float(seconds)
else:
# Just seconds
elapsed_time_seconds = float(time_str)
return {
'query': query_num,
'time_ms': int(elapsed_time_seconds * 1000) if elapsed_time_seconds else 0,
'peak_rss_kb': max_memory_kb if max_memory_kb else 0
}
except subprocess.SubprocessError as e:
print(f"Error running query {query_num}: {e}")
return None
except FileNotFoundError:
print("Error: /usr/bin/time not found. Please ensure it's installed.")
return None
def main():
parser = argparse.ArgumentParser(description='Run dfbench with memory usage tracking')
parser.add_argument('-q', '--query', type=int, help='Specific query number to run')
parser.add_argument('remaining', nargs=argparse.REMAINDER, help='Arguments to pass to dfbench')
args = parser.parse_args()
if not args.remaining:
print("Error: No dfbench command provided")
sys.exit(1)
# Modify iterations parameter
modified_args = modify_iterations(args.remaining)
results = []
if args.query is not None:
# Run specific query (convert from 1-based to 0-based indexing)
actual_query_num = args.query - 1
if actual_query_num < 0:
print("Error: Query number must be >= 1")
sys.exit(1)
result = run_query_with_memory_tracking(modified_args, actual_query_num)
if result is None:
sys.exit(1) # Exit with error code
# Convert back to 1-based for display
result['query'] = args.query
results.append(result)
else:
# Run queries starting from 1 until we get an error or invalid query
query_num = 1
while True:
result = run_query_with_memory_tracking(modified_args, query_num)
if result is None:
break
results.append(result)
query_num += 1
if len(results) == 0:
print("No valid queries found")
sys.exit(1)
# Print results in table format
print(f"{'Query':<8} {'Time (ms)':<12} {'Peak RSS':<10}")
print("-" * 32)
for result in results:
memory_str = format_memory(result['peak_rss_kb'])
print(f"{result['query']:<8} {result['time_ms']:<12} {memory_str}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment