Last active
July 30, 2025 14:14
-
-
Save UBarney/7416e95841fc9c29b114c052ebf56a16 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# example usage: | |
# python3 get_mem_usage.py ./target/release/dfbench nlj | |
# .... | |
# Query Time (ms) Peak RSS | |
# -------------------------------- | |
# 1 230 83.3 MB | |
# 2 310 362.1 MB | |
# 3 550 1.4 GB | |
# 4 960 2.8 GB | |
# 5 580 127.4 MB | |
# 6 4440 91.9 MB | |
# 7 610 128.5 MB | |
# 8 4490 90.1 MB | |
# 9 710 97.7 MB | |
# 10 2050 12.4 GB | |
# python3 get_mem_usage.py ./target/release/dfbench tpch --path /home/lv/code/datafusion/benchmarks/data/tpch_sf10 --prefer_hash_join true --format parquet -o /dev/null | |
# .... | |
# Query Time (ms) Peak RSS | |
# -------------------------------- | |
# 1 1250 809.4 MB | |
# 2 280 713.6 MB | |
# 3 590 1.0 GB | |
# 4 510 2.3 GB | |
# 5 940 1.3 GB | |
# 6 200 699.7 MB | |
# 7 1550 3.7 GB | |
# 8 960 1.6 GB | |
# 9 1710 2.6 GB | |
# 10 740 1.4 GB | |
# 11 250 537.8 MB | |
# 12 300 649.1 MB | |
# 13 630 1.6 GB | |
# 14 280 1.1 GB | |
# 15 520 1.2 GB | |
# 16 200 681.3 MB | |
# 17 1530 1.4 GB | |
# 18 2930 3.5 GB | |
# 19 520 1006.5 MB | |
# 20 630 1.4 GB | |
# 21 2000 1.9 GB | |
# 22 160 518.1 MB | |
import sys | |
import subprocess | |
import argparse | |
import re | |
import os | |
def modify_iterations(args): | |
"""Modify --iterations parameter to 1 if it exists in the arguments""" | |
modified_args = [] | |
i = 0 | |
modified=False | |
while i < len(args): | |
if args[i] == '--iterations': | |
modified_args.append(args[i]) | |
if i + 1 < len(args): | |
modified_args.append('1') # Force iterations to 1 | |
i += 2 | |
else: | |
modified_args.append('1') | |
i += 1 | |
modified=True | |
elif args[i].startswith('--iterations='): | |
modified_args.append('--iterations=1') | |
i += 1 | |
modified=True | |
else: | |
modified_args.append(args[i]) | |
i += 1 | |
if not modified: | |
modified_args.append('--iterations=1') | |
return modified_args | |
def format_memory(kb): | |
"""Format memory size in human readable format""" | |
if kb < 1024: | |
return f"{kb} KB" | |
elif kb < 1024 * 1024: | |
return f"{kb / 1024:.1f} MB" | |
else: | |
return f"{kb / 1024 / 1024:.1f} GB" | |
def run_query_with_memory_tracking(cmd_args, query_num): | |
"""Run a specific query with memory tracking using /usr/bin/time -v""" | |
# Add query specification to the command | |
cmd_with_query = cmd_args + ['-q', str(query_num)] | |
# Prepare the full command with time | |
time_cmd = ['/usr/bin/time', '-v'] + cmd_with_query | |
# Print the command that will be executed | |
print(f"Executing: {' '.join(time_cmd)}") | |
try: | |
# Run the command and capture output | |
result = subprocess.run( | |
time_cmd, | |
capture_output=True, | |
text=True, | |
check=False | |
) | |
# Check if the query was invalid (can happen even with exit code 0) | |
output_text = (result.stdout + result.stderr).lower() | |
if 'invalid query' in output_text: | |
print(f"Error running query {query_num}: invalid query") | |
return None | |
# Check if the command failed with non-zero exit code | |
if result.returncode != 0: | |
# Print error output | |
print(f"Error running query {query_num} (exit code {result.returncode}):") | |
if result.stdout: | |
print(f"STDOUT: {result.stdout}") | |
if result.stderr: | |
# Filter out time command output to show only the actual error | |
stderr_lines = result.stderr.split('\n') | |
time_output_started = False | |
error_lines = [] | |
for line in stderr_lines: | |
# Time output typically starts with lines containing metrics | |
if any(keyword in line for keyword in ['Command being timed:', 'User time', 'System time', 'Percent of CPU']): | |
time_output_started = True | |
continue | |
if not time_output_started and line.strip(): | |
error_lines.append(line) | |
if error_lines: | |
print(f"STDERR: {chr(10).join(error_lines)}") | |
return None # Return None for failed queries | |
# Extract memory information from time output | |
time_output = result.stderr | |
# Parse memory usage and elapsed time from time output | |
max_memory_kb = None | |
elapsed_time_seconds = None | |
for line in time_output.split('\n'): | |
if 'Maximum resident set size' in line: | |
# Extract memory in KB | |
match = re.search(r'Maximum resident set size \(kbytes\): (\d+)', line) | |
if match: | |
max_memory_kb = int(match.group(1)) | |
elif 'Elapsed (wall clock) time' in line: | |
# Extract elapsed time in various formats | |
match = re.search(r'Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (.+)', line) | |
if match: | |
time_str = match.group(1).strip() | |
# Parse time format (could be h:mm:ss.ss or m:ss.ss or s.ss) | |
if ':' in time_str: | |
time_parts = time_str.split(':') | |
if len(time_parts) == 3: # h:mm:ss format | |
hours, minutes, seconds = time_parts | |
elapsed_time_seconds = int(hours) * 3600 + int(minutes) * 60 + float(seconds) | |
elif len(time_parts) == 2: # m:ss format | |
minutes, seconds = time_parts | |
elapsed_time_seconds = int(minutes) * 60 + float(seconds) | |
else: | |
# Just seconds | |
elapsed_time_seconds = float(time_str) | |
return { | |
'query': query_num, | |
'time_ms': int(elapsed_time_seconds * 1000) if elapsed_time_seconds else 0, | |
'peak_rss_kb': max_memory_kb if max_memory_kb else 0 | |
} | |
except subprocess.SubprocessError as e: | |
print(f"Error running query {query_num}: {e}") | |
return None | |
except FileNotFoundError: | |
print("Error: /usr/bin/time not found. Please ensure it's installed.") | |
return None | |
def main(): | |
parser = argparse.ArgumentParser(description='Run dfbench with memory usage tracking') | |
parser.add_argument('-q', '--query', type=int, help='Specific query number to run') | |
parser.add_argument('remaining', nargs=argparse.REMAINDER, help='Arguments to pass to dfbench') | |
args = parser.parse_args() | |
if not args.remaining: | |
print("Error: No dfbench command provided") | |
sys.exit(1) | |
# Modify iterations parameter | |
modified_args = modify_iterations(args.remaining) | |
results = [] | |
if args.query is not None: | |
# Run specific query (convert from 1-based to 0-based indexing) | |
actual_query_num = args.query - 1 | |
if actual_query_num < 0: | |
print("Error: Query number must be >= 1") | |
sys.exit(1) | |
result = run_query_with_memory_tracking(modified_args, actual_query_num) | |
if result is None: | |
sys.exit(1) # Exit with error code | |
# Convert back to 1-based for display | |
result['query'] = args.query | |
results.append(result) | |
else: | |
# Run queries starting from 1 until we get an error or invalid query | |
query_num = 1 | |
while True: | |
result = run_query_with_memory_tracking(modified_args, query_num) | |
if result is None: | |
break | |
results.append(result) | |
query_num += 1 | |
if len(results) == 0: | |
print("No valid queries found") | |
sys.exit(1) | |
# Print results in table format | |
print(f"{'Query':<8} {'Time (ms)':<12} {'Peak RSS':<10}") | |
print("-" * 32) | |
for result in results: | |
memory_str = format_memory(result['peak_rss_kb']) | |
print(f"{result['query']:<8} {result['time_ms']:<12} {memory_str}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment