Skip to content

Instantly share code, notes, and snippets.

@vitorcalvi
Last active April 7, 2025 07:27
Show Gist options
  • Save vitorcalvi/7bd9c37411c0a5ae5464cc2bd2cfd289 to your computer and use it in GitHub Desktop.
Save vitorcalvi/7bd9c37411c0a5ae5464cc2bd2cfd289 to your computer and use it in GitHub Desktop.
EVO-X1 34GB Machine Learning Device Setup

sudo apt update && sudo apt install -y
build-essential
dkms

wget https://repo.radeon.com/amdgpu-install/6.3.3/ubuntu/noble/amdgpu-install_6.3.60303-1_all.deb sudo apt install -y ./amdgpu-install_6.3.60303-1_all.deb

Install with gfx1150 support

sudo amdgpu-install -y --usecase=rocm,mlsdk
--no-dkms
--opencl=rocr
--allow-unauthenticated

sudo apt install -y docker.io sudo systemctl enable --now docker sudo systemctl restart docker sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)"
-o /usr/local/bin/docker-compose

sudo chmod +x /usr/local/bin/docker-compose docker-compose --version

echo "📦 Installing dependencies..." sudo apt update && sudo apt install -y
cpufrequtils
rocm-smi
lm-sensors
cpuset
amdgpu-dkms

echo "🚀 Starting Ryzen AI Ultimate Optimization (v1.9)" echo "⚠️ Monitor with: watch -n1 "sensors amdgpu-* && rocm-smi -c && sudo cpupower monitor"" sleep 2

--- CPU Optimization ---

echo "⚙️ Configuring CPU..." echo "active" | sudo tee /sys/devices/system/cpu/amd_pstate/status >/dev/null sudo cpupower frequency-set --governor performance

echo " Isolating performance cores..." sudo cset shield --cpu=0-7,16-23 --kthread=on # Updated for Zen5 hybrid

--- Memory Tuning ---

echo "🧠 Memory Optimization:" sudo sysctl -w vm.swappiness=1 sudo sysctl -w vm.dirty_ratio=60 sudo sysctl -w vm.dirty_background_ratio=3 sudo sysctl -w vm.nr_hugepages=8192 # Aligned with system setup

--- GPU Configuration ---

echo "🎮 GPU Optimization:" GPU_SYSFS="/sys/class/drm/card0/device" echo " Using GPU path: $GPU_SYSFS"

echo " Setting GPU control to manual..." echo "manual" | sudo tee $GPU_SYSFS/power_dpm_force_performance_level

if [ -f $GPU_SYSFS/pp_od_clk_voltage ]; then echo " Applying GPU OC Profile..." sudo bash -c "echo 'r' > $GPU_SYSFS/pp_od_clk_voltage" # Reset sudo bash -c "echo 's 7 3000' > $GPU_SYSFS/pp_od_clk_voltage" sudo bash -c "echo 'm 3 2250' > $GPU_SYSFS/pp_od_clk_voltage" sudo bash -c "echo 'c' > $GPU_SYSFS/pp_od_clk_voltage" fi

HWMON_PATH=$(find $GPU_SYSFS/hwmon/ -name "hwmon*" -type d | head -1) echo " Adjusting power limits to 135W @ $HWMON_PATH..." sudo bash -c "echo 135000 > $HWMON_PATH/power1_cap"

--- System Services ---

echo "🔋 Power Management:" sudo systemctl stop power-profiles-daemon.service 2>/dev/null sudo systemctl mask power-profiles-daemon.service 2>/dev/null

--- Kernel Tuning ---

echo "🐧 Kernel Optimization:" sudo sysctl -w kernel.watchdog=0 sudo sysctl -w kernel.numa_balancing=0

echo "✅ Optimization Complete! Reboot recommended."

sudo docker run -itd --name pytorch-rocm
--device=/dev/kfd
--device=/dev/dri
--group-add video
--ipc=host
--shm-size 32G
-p 8888:8888
-v $(pwd):/workspace
-e HSA_OVERRIDE_GFX_VERSION=11.0.0
-e HSA_ENABLE_SDMA=1
rocm/pytorch:rocm6.3.3_ubuntu24.04_py3.12_pytorch_release_2.4.0

sudo docker run -itd --name tensorflow-rocm
--device=/dev/kfd
--device=/dev/dri
--group-add video
--ipc=host
--shm-size 32G
-e HSA_OVERRIDE_GFX_VERSION=11.0.0
-e TF_ROCM_FUSION_ENABLE=1
rocm/tensorflow:rocm6.3.3-py3.12-tf2.17-dev

@vitorcalvi
Copy link
Author

#!/usr/bin/env python3
import json
import subprocess
import sys
import time
from itertools import product
from pathlib import Path
from typing import Dict, List
from tqdm import tqdm
from colorama import Fore, Style, init

# --- Configuration ---
CONFIG = {
    "llama_bench_path": "./bin/llama-bench",
    "model_path": "/home/vi/models/DeepSeek-R1-Distill-Qwen-14B-IQ4_NL.gguf",
    "output_json": "/home/vi/llama_bench_results_deepseek.json",
    "max_retries": 1,
    "timeout": 300,
    "numa_strategy": "isolate",
    "cpu_mask": "0x55555555",
    "main_gpu": 0,
    "ubatch_size": 512,
    "mmap": 1,
    "repetitions": 5,
    "poll": 50,
    "combination_top_n": 2,
    "deep_search_params": {
        "step_percentage": 0.25,
        "steps_around": 2,
        "min_values": 3
    },
    "temperature": {
        "cpu_sensor": "k10temp",
        "gpu_hwmon_pattern": "/sys/class/drm/card0/device/hwmon/hwmon*/temp1_input",
        "max_temp_samples": 5,
        "sample_interval": 0.2
    },
    "parameters": {
        "threads": [4, 8, 12, 16, 24],
        "batch_size": [512, 1024, 2048, 4096],
        "gpu_layers": [99, 80, 64, 32],
        "flash_attn": [0, 1],
        "split_mode": ["layer", "row"],
        "cache_type": ["f16", "q4_0"],
        "tensor_split": ["0", "0/0"]
    },
    "optimization_order": [
        "threads",
        "batch_size",
        "gpu_layers",
        "flash_attn",
        "split_mode",
        "cache_type",
        "tensor_split"
    ]
}

init(autoreset=True)

class BenchmarkRunner:
    def __init__(self, config: Dict):
        self.config = config
        self.results = []
        self.best_params = {"tg_speed": 0.0}
        self.param_results = {param: [] for param in config['parameters']}
        self.validate_paths()
        self.load_existing_results()

    def validate_paths(self):
        if not Path(self.config['llama_bench_path']).is_file():
            self.error_exit(f"llama-bench not found at {self.config['llama_bench_path']}")
        if not Path(self.config['model_path']).is_file():
            self.error_exit(f"Model file not found at {self.config['model_path']}")
        Path(self.config['output_json']).parent.mkdir(parents=True, exist_ok=True)

    def error_exit(self, message: str):
        print(f"{Fore.RED}ERROR: {message}{Style.RESET_ALL}")
        sys.exit(1)

    def load_existing_results(self):
        try:
            if Path(self.config['output_json']).exists():
                with open(self.config['output_json'], 'r') as f:
                    self.results = json.load(f)
                    if self.results:
                        self.best_params = max(
                            self.results, 
                            key=lambda x: x.get('tg_speed', 0.0)
                        )
            else:
                self.best_params = {
                    param: self.config['parameters'][param][0] 
                    for param in self.config['optimization_order']
                }
                self.best_params['tg_speed'] = 0.0
        except Exception as e:
            self.error_exit(f"Failed to load results: {str(e)}")

    def get_cpu_temperature(self) -> float:
        try:
            for hwmon in Path('/sys/class/hwmon').glob('hwmon*'):
                if (hwmon / 'name').read_text().strip() == self.config['temperature']['cpu_sensor']:
                    return int((hwmon / 'temp1_input').read_text().strip()) / 1000
            result = subprocess.run(['sensors', self.config['temperature']['cpu_sensor']], 
                                  capture_output=True, text=True)
            for line in result.stdout.split('\n'):
                if 'Tctl' in line:
                    return float(line.split('+')[1].split('°')[0])
            return 0.0
        except Exception as e:
            print(f"{Fore.YELLOW}CPU temp error: {e}{Style.RESET_ALL}")
            return 0.0

    def get_gpu_temperature(self):
        try:
            # Direct hwmon read
            for hwmon_path in Path('/sys/class/drm/card0/device/hwmon').glob('hwmon*'):
                temp_path = hwmon_path / 'temp1_input'
                if temp_path.exists():
                    return int(temp_path.read_text().strip()) / 1000.0
            # Fallback to sensors command [[6]]
            result = subprocess.run(['sensors', 'amdgpu-pci-6500'], capture_output=True, text=True)
            for line in result.stdout.split('\n'):
                if 'edge:' in line.lower():
                    return float(line.split()[-1].strip('+°C'))
        except Exception as e:
            print(f"{Fore.YELLOW}GPU temp error: {e}{Style.RESET_ALL}")
        return 0.0

    def monitor_temperatures(self) -> Dict[str, List[float]]:
        temps = {'cpu': [], 'gpu': []}
        try:
            # Warmup read
            self.get_cpu_temperature()
            gpu_temp = self.get_gpu_temperature()
            if gpu_temp == 0.0:
                print(f"{Fore.YELLOW}Skipping GPU monitoring (sensor unavailable){Style.RESET_ALL}")
                temps.pop('gpu')
            
            # Actual monitoring
            for _ in range(self.config['temperature']['max_temp_samples']):
                temps['cpu'].append(self.get_cpu_temperature())
                if 'gpu' in temps:
                    temps['gpu'].append(self.get_gpu_temperature())
                time.sleep(self.config['temperature']['sample_interval'])
        except Exception as e:
            print(f"{Fore.YELLOW}Temp monitoring failed: {e}{Style.RESET_ALL}")
        
        return temps


    def run_benchmark(self, params: Dict) -> Dict:
        pre_temps = self.monitor_temperatures()
        result = self._run_bench_command(params)
        post_temps = self.monitor_temperatures()
        
        # Fix: Replace 'temps' with proper variable references
        gpu_available = 'gpu' in pre_temps or 'gpu' in post_temps
        gpu_max = 0.0
        if gpu_available:
            gpu_max = max(
                pre_temps.get('gpu', [0.0]) + 
                post_temps.get('gpu', [0.0])
            )
        
        result.update({
            'cpu_temp_pre': max(pre_temps['cpu']) if pre_temps.get('cpu') else 0.0,
            'gpu_temp_pre': max(pre_temps['gpu']) if pre_temps.get('gpu') else 0.0,
            'cpu_temp_post': max(post_temps['cpu']) if post_temps.get('cpu') else 0.0,
            'gpu_temp_post': max(post_temps['gpu']) if post_temps.get('gpu') else 0.0,
            'cpu_temp_max': max(pre_temps['cpu'] + post_temps.get('cpu', [])),
            'gpu_temp_max': gpu_max  # Fixed calculation
        })
        return result

    def _run_bench_command(self, params: Dict) -> Dict:
        command = [
            self.config['llama_bench_path'],
            '-m', self.config['model_path'],
            '-t', str(params['threads']),
            '-b', str(params['batch_size']),
            '-ub', str(self.config['ubatch_size']),
            '-ngl', str(params['gpu_layers']),
            '-sm', params['split_mode'],
            '-fa', str(params['flash_attn']),
            '-mmp', str(self.config['mmap']),
            '-ctk', params['cache_type'],
            '-ctv', params['cache_type'],
            '-r', str(self.config['repetitions']),
            '--poll', str(self.config['poll']),
            '-ts', params['tensor_split'],
            '--numa', self.config['numa_strategy'],
            '-C', self.config['cpu_mask'],
            '-mg', str(self.config['main_gpu']),
            '-o', 'json'
        ]

        for attempt in range(self.config['max_retries'] + 1):
            try:
                result = subprocess.run(
                    command,
                    check=True,
                    timeout=self.config['timeout'],
                    capture_output=True,
                    text=True
                )
                return self.parse_json_output(result.stdout)
            except subprocess.CalledProcessError as e:
                if attempt == self.config['max_retries']:
                    return {"error": str(e)}
                time.sleep(2 ** attempt)
            except subprocess.TimeoutExpired:
                print(f"{Fore.YELLOW}Timeout for: {params}{Style.RESET_ALL}")
                return {"error": "Timeout"}
        return {"error": "Max retries exceeded"}


        def parse_json_output(self, output: str) -> Dict:
            """
            Parses benchmark output JSON with error handling for different formats.
            Handles both list and dictionary structures, extracts performance metrics.
            """
            try:
                data = json.loads(output)  # Parse JSON string [[8]]
                tg_speed = 0.0
                memory_usage = 0
                latency = 0.0

                # Handle list output format [[1]][[9]]
                if isinstance(data, list):
                    if len(data) > 0 and isinstance(data[0], dict):
                        tg_speed = data[0].get('tg_speed', 0.0)
                        memory_usage = data[0].get('memory_usage', 0)
                        latency = data[0].get('latency', 0.0)
                
                # Handle dictionary output format [[2]]
                elif isinstance(data, dict):
                    tg_speed = data.get('tg_speed', 0.0)
                    memory_usage = data.get('memory_usage', 0)
                    latency = data.get('latency', 0.0)

                return {
                    'tg_speed': float(tg_speed),
                    'memory_usage': memory_usage,
                    'latency': latency,
                    'raw_output': data  # Preserve original data for debugging [[6]]
                }

            except json.JSONDecodeError as json_err:
                # Handle invalid JSON format [[4]]
                print(f"{Fore.YELLOW}JSON decode error: {json_err}{Style.RESET_ALL}")
                return {'error': f'JSON decode error: {str(json_err)}'}
            
            except AttributeError as attr_err:
                # Handle cases where data structure is unexpected [[7]]
                print(f"{Fore.YELLOW}Attribute error: {attr_err}{Style.RESET_ALL}")
                return {'error': f'Attribute error: {str(attr_err)}'}
            
            except Exception as e:
                # Catch-all for unexpected errors [[4]]
                print(f"{Fore.YELLOW}Unexpected parsing error: {e}{Style.RESET_ALL}")
                return {'error': f'Unexpected error: {str(e)}'}


    def save_results(self):
        try:
            with open(self.config['output_json'], 'w') as f:
                json.dump(self.results, f, indent=2)
        except Exception as e:
            self.error_exit(f"Save failed: {str(e)}")

    def print_summary(self):
        print(f"\n{Fore.GREEN}=== Benchmark Complete ===")
        print("Best Configuration:")
        print("-------------------")
        for key, value in self.best_params.items():
            if key == 'tg_speed':
                print(f"{Fore.CYAN}Token Speed: {value:.2f} t/s")
            elif key.startswith('cpu_temp'):
                print(f"CPU Temp ({key.split('_')[-1]}): {Fore.BLUE}{value:.1f}°C")
            elif key.startswith('gpu_temp'):
                print(f"GPU Temp ({key.split('_')[-1]}): {Fore.BLUE}{value:.1f}°C")
            elif key in ['memory_usage', 'latency']:
                print(f"{key.title().replace('_', ' ')}: {Fore.BLUE}{value}")
            elif key not in ['raw_output', 'error']:
                print(f"{key.title().replace('_', ' '):<20}: {Fore.BLUE}{value}")

    def run(self):
        # Phase 1: Stepwise optimization
        total_stepwise = sum(len(self.config['parameters'][param]) for param in self.config['optimization_order'])
        print(f"{Fore.GREEN}=== Stepwise Optimization ===")
        with tqdm(total=total_stepwise, desc="Stepwise") as pbar:
            for param in self.config['optimization_order']:
                current_best = self.best_params.copy()
                for value in self.config['parameters'][param]:
                    candidate = current_best.copy()
                    candidate[param] = value
                    existing = next((entry for entry in self.results if all(entry.get(k) == v for k, v in candidate.items())), None)
                    if not existing:
                        result = self.run_benchmark(candidate)
                        entry = {**candidate, **result}
                        self.results.append(entry)
                        self.save_results()
                    else:
                        result = existing  # Add this line to use cached result

                    if 'error' not in result:  # Now safe to check
                        if result['tg_speed'] > self.best_params.get('tg_speed', 0):
                            self.best_params = {**candidate, **result}
                            print(f"\n{Fore.GREEN}New best {param}={value}: {result['tg_speed']:.2f} t/s")
                        self.param_results[param].append((value, result['tg_speed']))
                    pbar.update(1)

        # Phase 2: Combination testing
        combination_params = []
        for param in self.config['optimization_order']:
            sorted_values = sorted(self.param_results[param], key=lambda x: -x[1])
            combination_params.append([v[0] for v in sorted_values[:self.config['combination_top_n']]])
        
        print(f"\n{Fore.GREEN}=== Combination Testing ===")
        with tqdm(total=len(list(product(*combination_params))), desc="Combinations") as pbar:
            for combo in product(*combination_params):
                candidate = {param: combo[i] for i, param in enumerate(self.config['optimization_order'])}
                existing = next((entry for entry in self.results if all(entry.get(k) == v for k, v in candidate.items())), None)
                if not existing:
                    result = self.run_benchmark(candidate)
                    self.results.append({**candidate, **result})
                    self.save_results()
                if 'error' not in result and result['tg_speed'] > self.best_params['tg_speed']:
                    self.best_params = {**candidate, **result}
                    print(f"\n{Fore.GREEN}New best combination: {result['tg_speed']:.2f} t/s")
                pbar.update(1)

        # Phase 3: Deep parameter search
        print(f"\n{Fore.GREEN}=== Deep Parameter Search ===")
        deep_params = {}
        for param in self.config['optimization_order']:
            best_value = self.best_params[param]
            if isinstance(best_value, (int, float)) and param in ['threads', 'batch_size', 'gpu_layers']:
                min_val = min(self.config['parameters'][param])
                max_val = max(self.config['parameters'][param])
                step = max(1, int(best_value * self.config['deep_search_params']['step_percentage']))
                values = set()
                current = best_value
                for _ in range(self.config['deep_search_params']['steps_around']):
                    current -= step
                    if current >= min_val:
                        values.add(current)
                current = best_value
                for _ in range(self.config['deep_search_params']['steps_around']):
                    current += step
                    if current <= max_val:
                        values.add(current)
                values.add(best_value)
                values = sorted(values)[:self.config['deep_search_params']['min_values']]
                deep_params[param] = values
            else:
                deep_params[param] = [best_value]
        
        with tqdm(total=len(list(product(*deep_params.values()))), desc="Deep Search") as pbar:
            for combo in product(*deep_params.values()):
                candidate = {param: combo[i] for i, param in enumerate(self.config['optimization_order'])}
                existing = next((entry for entry in self.results if all(entry.get(k) == v for k, v in candidate.items())), None)
                if not existing:
                    result = self.run_benchmark(candidate)
                    self.results.append({**candidate, **result})
                    self.save_results()
                if 'error' not in result and result['tg_speed'] > self.best_params['tg_speed']:
                    self.best_params = {**candidate, **result}
                    print(f"\n{Fore.GREEN}New deep best: {result['tg_speed']:.2f} t/s")
                pbar.update(1)

        self.print_summary()

if __name__ == "__main__":
    runner = BenchmarkRunner(CONFIG)
    runner.run()
 
 

@vitorcalvi
Copy link
Author

GEMINI

#!/usr/bin/env python3
import json
import subprocess
import sys
import time
from itertools import product
from pathlib import Path
from typing import Dict, List, Any # Added Any
from tqdm import tqdm
from colorama import Fore, Style, init

# --- Configuration ---
CONFIG = {
    "llama_bench_path": "./bin/llama-bench",
    "model_path": "/home/vi/models/DeepSeek-R1-Distill-Qwen-14B-IQ4_NL.gguf",
    "output_json": "/home/vi/llama_bench_results_deepseek.json",
    "max_retries": 1,
    "timeout": 300,
    "numa_strategy": "isolate",
    "cpu_mask": "0x55555555",
    "main_gpu": 0,
    "ubatch_size": 512,
    "mmap": 1,
    "repetitions": 5,
    "poll": 50,
    "combination_top_n": 2,
    "deep_search_params": {
        "step_percentage": 0.25,
        "steps_around": 2,
        "min_values": 3
    },
    "temperature": {
        "cpu_sensor": "k10temp",
        "gpu_hwmon_pattern": "/sys/class/drm/card0/device/hwmon/hwmon*/temp1_input", # Note: This pattern isn't used in the current get_gpu_temperature logic
        "max_temp_samples": 5,
        "sample_interval": 0.2
    },
    "parameters": {
        "threads": [4, 8, 12, 16, 24],
        "batch_size": [512, 1024, 2048, 4096],
        "gpu_layers": [99, 80, 64, 32],
        "flash_attn": [0, 1],
        "split_mode": ["layer", "row"],
        "cache_type": ["f16", "q4_0"],
        "tensor_split": ["0", "0/0"]
    },
    "optimization_order": [
        "threads",
        "batch_size",
        "gpu_layers",
        "flash_attn",
        "split_mode",
        "cache_type",
        "tensor_split"
    ]
}

init(autoreset=True)

class BenchmarkRunner:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.results: List[Dict[str, Any]] = []
        self.best_params: Dict[str, Any] = {"tg_speed": 0.0}
        self.param_results: Dict[str, List[tuple[Any, float]]] = {param: [] for param in config['parameters']}
        self.validate_paths()
        self.load_existing_results()

    def validate_paths(self):
        if not Path(self.config['llama_bench_path']).is_file():
            self.error_exit(f"llama-bench not found at {self.config['llama_bench_path']}")
        if not Path(self.config['model_path']).is_file():
            self.error_exit(f"Model file not found at {self.config['model_path']}")
        Path(self.config['output_json']).parent.mkdir(parents=True, exist_ok=True)

    def error_exit(self, message: str):
        print(f"{Fore.RED}ERROR: {message}{Style.RESET_ALL}")
        sys.exit(1)

    def load_existing_results(self):
        output_path = Path(self.config['output_json'])
        if output_path.exists() and output_path.stat().st_size > 0: # Check if file exists and is not empty
            try:
                with open(output_path, 'r') as f:
                    self.results = json.load(f)
                    if isinstance(self.results, list) and self.results: # Ensure it's a non-empty list
                        # Filter out entries that might lack 'tg_speed' before finding max
                        valid_results = [r for r in self.results if isinstance(r, dict) and 'tg_speed' in r and isinstance(r['tg_speed'], (int, float))]
                        if valid_results:
                            self.best_params = max(valid_results, key=lambda x: x.get('tg_speed', 0.0))
                        else:
                            print(f"{Fore.YELLOW}Warning: Existing results file contains no valid entries with 'tg_speed'. Starting fresh search.{Style.RESET_ALL}")
                            self._initialize_best_params()
                    else:
                        # Handle cases where the file might contain non-list JSON or was empty after loading
                        print(f"{Fore.YELLOW}Warning: Existing results file is empty or not a list. Starting fresh search.{Style.RESET_ALL}")
                        self.results = []
                        self._initialize_best_params()

            except json.JSONDecodeError as e:
                print(f"{Fore.YELLOW}Warning: Failed to decode JSON from {self.config['output_json']}. File might be corrupted. Starting fresh search. Error: {e}{Style.RESET_ALL}")
                self.results = []
                self._initialize_best_params()
            except Exception as e:
                 # Catch other potential file reading errors
                print(f"{Fore.YELLOW}Warning: Failed to load or process results from {self.config['output_json']}. Starting fresh search. Error: {e}{Style.RESET_ALL}")
                self.results = []
                self._initialize_best_params()
        else:
            # File doesn't exist or is empty
            self._initialize_best_params()

    def _initialize_best_params(self):
         """Sets initial best_params based on the first value of each parameter."""
         self.best_params = {
             param: self.config['parameters'][param][0]
             for param in self.config['optimization_order']
         }
         self.best_params['tg_speed'] = 0.0 # Explicitly set initial speed


    def get_cpu_temperature(self) -> float:
        try:
            # Try reading directly from sysfs first (more reliable if available)
            cpu_sensor_name = self.config['temperature']['cpu_sensor']
            for hwmon in Path('/sys/class/hwmon').glob('hwmon*'):
                 try:
                     name_path = hwmon / 'name'
                     if name_path.exists() and name_path.read_text().strip() == cpu_sensor_name:
                         temp_input_path = hwmon / 'temp1_input' # Adjust if needed (e.g., temp2_input)
                         if temp_input_path.exists():
                             return int(temp_input_path.read_text().strip()) / 1000.0
                 except (IOError, ValueError):
                     continue # Ignore errors reading from specific hwmon entries

            # Fallback to 'sensors' command
            result = subprocess.run(['sensors', cpu_sensor_name], capture_output=True, text=True, check=False)
            if result.returncode == 0:
                for line in result.stdout.split('\n'):
                     # Look for common patterns like Tctl, Tdie, Core 0, Package id 0
                     if any(idfr in line for idfr in ['Tctl:', 'Tdie:', 'Core 0:', 'Package id 0:']):
                         parts = line.split()
                         for part in parts:
                             if part.startswith('+') and part.endswith('°C'):
                                 return float(part.strip('+°C'))
            print(f"{Fore.YELLOW}Could not determine CPU temperature using sysfs or 'sensors' for {cpu_sensor_name}.{Style.RESET_ALL}")
            return 0.0 # Return 0.0 if sensor not found or unreadable

        except FileNotFoundError:
             print(f"{Fore.YELLOW}'sensors' command not found. Cannot get CPU temperature.{Style.RESET_ALL}")
             return 0.0
        except Exception as e:
            print(f"{Fore.YELLOW}CPU temp error: {e}{Style.RESET_ALL}")
            return 0.0

    def get_gpu_temperature(self) -> float:
        try:
            # Try direct hwmon read (often card0/device/hwmon/hwmonX/temp1_input for AMD/Nvidia)
            hwmon_paths = list(Path('/sys/class/drm/card0/device/hwmon').glob('hwmon*/temp1_input'))
            if not hwmon_paths: # Try alternative common path for some GPUs
                 hwmon_paths = list(Path('/sys/class/hwmon').glob('hwmon*/temp1_input'))

            for temp_path in hwmon_paths:
                 try:
                     # Check associated name file if exists, might help identify correct sensor
                     name_path = temp_path.parent / 'name'
                     # Add specific checks if needed, e.g., name_path.read_text().strip() == 'amdgpu'
                     if temp_path.exists():
                         return int(temp_path.read_text().strip()) / 1000.0
                 except (IOError, ValueError):
                     continue # Ignore errors with specific hwmon paths

            # Fallback to nvidia-smi if hwmon fails
            try:
                result_nvidia = subprocess.run(['nvidia-smi', '--query-gpu=temperature.gpu', '--format=csv,noheader'], capture_output=True, text=True, check=True)
                return float(result_nvidia.stdout.strip())
            except (FileNotFoundError, subprocess.CalledProcessError, ValueError):
                pass # nvidia-smi not found or failed, proceed to amdgpu attempt

             # Fallback to sensors command for amdgpu if others fail
            try:
                result_amd = subprocess.run(['sensors', 'amdgpu-pci*'], capture_output=True, text=True, check=False) # Adjust pattern if needed
                if result_amd.returncode == 0:
                    for line in result_amd.stdout.split('\n'):
                        if 'edge:' in line.lower(): # Common identifier for AMD GPU temp
                             parts = line.split()
                             for part in parts:
                                 if part.startswith('+') and part.endswith('°C'):
                                     return float(part.strip('+°C'))
            except FileNotFoundError:
                 pass # sensors command not found
            except Exception as e:
                 print(f"{Fore.YELLOW}Error during 'sensors amdgpu' call: {e}{Style.RESET_ALL}")


            print(f"{Fore.YELLOW}Could not determine GPU temperature via sysfs, nvidia-smi, or sensors.{Style.RESET_ALL}")
        except Exception as e:
            print(f"{Fore.YELLOW}GPU temp error: {e}{Style.RESET_ALL}")
        return 0.0

    def monitor_temperatures(self) -> Dict[str, List[float]]:
        temps: Dict[str, List[float]] = {'cpu': []}
        gpu_available = False
        try:
            # Warmup read and check availability
            self.get_cpu_temperature() # Initial read for CPU
            gpu_temp = self.get_gpu_temperature()
            if gpu_temp > 0.0: # Use > 0 as indicator that sensor works
                temps['gpu'] = []
                gpu_available = True
            else:
                print(f"{Fore.YELLOW}Skipping GPU temperature monitoring (sensor unavailable or read 0.0).{Style.RESET_ALL}")

            # Actual monitoring loop
            for _ in range(self.config['temperature']['max_temp_samples']):
                temps['cpu'].append(self.get_cpu_temperature())
                if gpu_available:
                    temps['gpu'].append(self.get_gpu_temperature())
                time.sleep(self.config['temperature']['sample_interval'])
        except Exception as e:
            print(f"{Fore.YELLOW}Temperature monitoring failed during loop: {e}{Style.RESET_ALL}")

        return temps


    def run_benchmark(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Runs llama-bench and collects performance and temperature metrics."""
        pre_temps = self.monitor_temperatures()
        bench_result = self._run_bench_command(params)
        post_temps = self.monitor_temperatures()

        # Calculate max temps safely
        max_cpu_temp = 0.0
        if pre_temps.get('cpu') or post_temps.get('cpu'):
             all_cpu_temps = pre_temps.get('cpu', []) + post_temps.get('cpu', [])
             if all_cpu_temps:
                 max_cpu_temp = max(all_cpu_temps)

        max_gpu_temp = 0.0
        if pre_temps.get('gpu') or post_temps.get('gpu'):
            all_gpu_temps = pre_temps.get('gpu', []) + post_temps.get('gpu', [])
            if all_gpu_temps:
                 max_gpu_temp = max(all_gpu_temps)

        # Combine benchmark results with temperature readings
        final_result = {
            **bench_result, # Include tg_speed, memory_usage, latency, raw_output or error
            'cpu_temp_pre_max': max(pre_temps['cpu']) if pre_temps.get('cpu') else 0.0,
            'gpu_temp_pre_max': max(pre_temps['gpu']) if pre_temps.get('gpu') else 0.0,
            'cpu_temp_post_max': max(post_temps['cpu']) if post_temps.get('cpu') else 0.0,
            'gpu_temp_post_max': max(post_temps['gpu']) if post_temps.get('gpu') else 0.0,
            'cpu_temp_run_max': max_cpu_temp,
            'gpu_temp_run_max': max_gpu_temp,
        }
        return final_result

    def _run_bench_command(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Executes the llama-bench command and returns parsed JSON or error."""
        command = [
            self.config['llama_bench_path'],
            '-m', self.config['model_path'],
            '-t', str(params['threads']),
            '-b', str(params['batch_size']),
            '-ub', str(self.config['ubatch_size']),
            '-ngl', str(params['gpu_layers']),
            '-sm', str(params['split_mode']),
            '-fa', str(params['flash_attn']),
            '-mmp', str(self.config['mmap']),
            '-ctk', str(params['cache_type']),
            '-ctv', str(params['cache_type']),
            '-r', str(self.config['repetitions']),
            '--poll', str(self.config['poll']),
            '-ts', str(params['tensor_split']),
            '--numa', self.config['numa_strategy'],
            '-C', self.config['cpu_mask'],
            '-mg', str(self.config['main_gpu']),
            '-o', 'json' # Request JSON output
        ]

        for attempt in range(self.config['max_retries'] + 1):
            try:
                print(f"{Style.DIM}Running command: {' '.join(command)}{Style.RESET_ALL}") # Log command being run
                result = subprocess.run(
                    command,
                    check=True, # Raises CalledProcessError on non-zero exit
                    timeout=self.config['timeout'],
                    capture_output=True,
                    text=True,
                    encoding='utf-8' # Explicitly set encoding
                )
                # Check if stdout is empty before parsing
                if not result.stdout.strip():
                    print(f"{Fore.YELLOW}Warning: llama-bench produced empty output for params: {params}{Style.RESET_ALL}")
                    # You might want to treat this as an error or a specific result
                    return {"error": "Empty output from llama-bench", "tg_speed": 0.0, "memory_usage": 0, "latency": 0.0}

                # Parse the JSON output - parse_json_output is now correctly a method
                return self.parse_json_output(result.stdout)

            except subprocess.CalledProcessError as e:
                print(f"{Fore.RED}Command failed: {' '.join(command)}{Style.RESET_ALL}")
                print(f"{Fore.RED}Return code: {e.returncode}{Style.RESET_ALL}")
                print(f"{Fore.RED}Stderr: {e.stderr}{Style.RESET_ALL}")
                if attempt == self.config['max_retries']:
                    return {"error": f"Command failed after retries: {e}", "stderr": e.stderr}
                print(f"{Fore.YELLOW}Retrying ({attempt + 1}/{self.config['max_retries']})...{Style.RESET_ALL}")
                time.sleep(2 ** attempt) # Exponential backoff

            except subprocess.TimeoutExpired:
                print(f"{Fore.YELLOW}Timeout expired ({self.config['timeout']}s) for command: {' '.join(command)}{Style.RESET_ALL}")
                return {"error": "Timeout"}

            except FileNotFoundError:
                 self.error_exit(f"llama-bench command not found at '{self.config['llama_bench_path']}'. Please check the path.")

            except Exception as e: # Catch other potential errors during subprocess run
                print(f"{Fore.RED}An unexpected error occurred running the benchmark: {e}{Style.RESET_ALL}")
                if attempt == self.config['max_retries']:
                    return {"error": f"Unexpected error after retries: {e}"}
                time.sleep(2 ** attempt)

        return {"error": "Max retries exceeded without success"} # Should only be reached if all retries fail

    # *** CORRECTED INDENTATION ***
    def parse_json_output(self, output: str) -> Dict[str, Any]:
        """
        Parses benchmark output JSON with error handling for different formats.
        Handles both list and dictionary structures, extracts performance metrics.
        """
        try:
            # Clean potential non-JSON prefixes/suffixes if necessary (though '-o json' should prevent this)
            # Sometimes logs might interfere, find the first '{' or '['
            json_start = -1
            first_brace = output.find('{')
            first_bracket = output.find('[')

            if first_brace != -1 and (first_bracket == -1 or first_brace < first_bracket):
                json_start = first_brace
            elif first_bracket != -1:
                json_start = first_bracket

            if json_start == -1:
                 print(f"{Fore.YELLOW}Could not find start of JSON ('{{' or '[') in output:\n---\n{output}\n---{Style.RESET_ALL}")
                 return {'error': 'No JSON object/array found in output', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}

            cleaned_output = output[json_start:]
            data = json.loads(cleaned_output)

            tg_speed = 0.0
            memory_usage = 0
            latency = 0.0

            # Handle list output format (e.g., multiple runs summarized)
            # Take the first entry assuming it's representative or the main result
            if isinstance(data, list):
                if len(data) > 0 and isinstance(data[0], dict):
                    first_item = data[0]
                    tg_speed = first_item.get('tokens_per_second', first_item.get('tg_speed', 0.0)) # Check both common keys
                    memory_usage = first_item.get('memory_usage', 0) # Assuming this key exists
                    latency = first_item.get('latency', 0.0) # Assuming this key exists
                else:
                     print(f"{Fore.YELLOW}Warning: JSON output is a list, but first item is not a dictionary or list is empty.{Style.RESET_ALL}")

            # Handle dictionary output format (single run result)
            elif isinstance(data, dict):
                tg_speed = data.get('tokens_per_second', data.get('tg_speed', 0.0)) # Check both common keys
                memory_usage = data.get('memory_usage', 0)
                latency = data.get('latency', 0.0)
            else:
                 print(f"{Fore.YELLOW}Warning: Parsed JSON is neither a list nor a dictionary: {type(data)}{Style.RESET_ALL}")


            # Ensure types are correct before returning
            try:
                tg_speed = float(tg_speed) if tg_speed is not None else 0.0
            except (ValueError, TypeError):
                print(f"{Fore.YELLOW}Warning: Could not convert tg_speed '{tg_speed}' to float. Using 0.0.{Style.RESET_ALL}")
                tg_speed = 0.0
            try:
                memory_usage = int(memory_usage) if memory_usage is not None else 0
            except (ValueError, TypeError):
                 print(f"{Fore.YELLOW}Warning: Could not convert memory_usage '{memory_usage}' to int. Using 0.{Style.RESET_ALL}")
                 memory_usage = 0
            try:
                latency = float(latency) if latency is not None else 0.0
            except (ValueError, TypeError):
                print(f"{Fore.YELLOW}Warning: Could not convert latency '{latency}' to float. Using 0.0.{Style.RESET_ALL}")
                latency = 0.0


            return {
                'tg_speed': tg_speed,
                'memory_usage': memory_usage,
                'latency': latency,
                'raw_output': data # Preserve original parsed data
            }

        except json.JSONDecodeError as json_err:
            print(f"{Fore.YELLOW}JSON decode error: {json_err} for output:\n---\n{output}\n---{Style.RESET_ALL}")
            # Return error but also default numeric values to avoid breaking calculations later
            return {'error': f'JSON decode error: {str(json_err)}', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}

        except Exception as e:
            # Catch-all for unexpected errors during parsing
            print(f"{Fore.YELLOW}Unexpected parsing error: {e} for output:\n---\n{output}\n---{Style.RESET_ALL}")
            return {'error': f'Unexpected parsing error: {str(e)}', 'tg_speed': 0.0, 'memory_usage': 0, 'latency': 0.0}


    def save_results(self):
        """Saves the current results list to the JSON file."""
        try:
            with open(self.config['output_json'], 'w') as f:
                json.dump(self.results, f, indent=2)
        except IOError as e:
            print(f"{Fore.RED}Save failed: Could not write to {self.config['output_json']}. Error: {e}{Style.RESET_ALL}")
        except Exception as e: # Catch other potential errors like serialization issues
            print(f"{Fore.RED}Save failed: An unexpected error occurred. Error: {e}{Style.RESET_ALL}")


    def print_summary(self):
        """Prints a summary of the best found configuration."""
        print(f"\n{Fore.GREEN}=== Benchmark Complete ===")
        if not self.best_params or self.best_params.get('tg_speed', 0.0) == 0.0:
             print(f"{Fore.YELLOW}No successful runs recorded or best speed is 0. Cannot show best configuration.{Style.RESET_ALL}")
             # Optionally print the last few results or errors if needed
             # print("Last few results:", self.results[-5:])
             return

        print(f"{Fore.CYAN}Best Configuration Found:")
        print("---------------------------")
        # Prioritize printing core performance metrics first
        print(f"{'Token Speed (tg_speed)':<25}: {Fore.GREEN}{self.best_params.get('tg_speed', 0.0):.2f} t/s{Style.RESET_ALL}")
        if 'latency' in self.best_params:
             print(f"{'Latency':<25}: {Fore.BLUE}{self.best_params['latency']:.2f} ms{Style.RESET_ALL}") # Assuming ms
        if 'memory_usage' in self.best_params:
             # Add formatting for memory if it's in bytes
             mem_usage = self.best_params['memory_usage']
             if isinstance(mem_usage, (int, float)) and mem_usage > 0:
                  mem_gb = mem_usage / (1024**3)
                  print(f"{'Memory Usage':<25}: {Fore.BLUE}{mem_gb:.2f} GB ({mem_usage} bytes){Style.RESET_ALL}")
             else:
                  print(f"{'Memory Usage':<25}: {Fore.BLUE}{mem_usage}{Style.RESET_ALL}")


        # Print the parameters that achieved this performance
        print("\nParameters:")
        for key, value in self.best_params.items():
            # Skip internal/performance keys already printed or raw data
            if key in ['tg_speed', 'memory_usage', 'latency', 'raw_output', 'error'] or key.startswith(('cpu_temp', 'gpu_temp')):
                continue
            print(f"  {key:<23}: {Fore.BLUE}{value}{Style.RESET_ALL}")

        # Print temperature information
        print("\nTemperatures During Best Run:")
        temp_keys = [k for k in self.best_params if k.startswith(('cpu_temp', 'gpu_temp'))]
        if temp_keys:
            for key in sorted(temp_keys):
                 temp_value = self.best_params[key]
                 # Format label nicely
                 label = key.replace('_', ' ').replace(' temp ', ' Temp ').replace('pre max', 'Pre (Max)').replace('post max', 'Post (Max)').replace('run max', 'Run (Max)').title()
                 print(f"  {label:<23}: {Fore.BLUE}{temp_value:.1f}°C{Style.RESET_ALL}")
        else:
             print(f"  {Fore.YELLOW}Temperature data not available for the best run.{Style.RESET_ALL}")

        print("---------------------------")
        print(f"Full results saved to: {self.config['output_json']}")


    def run(self):
        """Executes the full benchmarking process: stepwise, combination, and deep search."""
        
        # --- Phase 1: Stepwise Optimization ---
        print(f"{Fore.GREEN}=== Phase 1: Stepwise Optimization ===")
        # Calculate total steps accurately based on parameters list
        total_stepwise_steps = sum(len(self.config['parameters'][param]) for param in self.config['optimization_order'])
        
        current_best_for_stepwise = self.best_params.copy() # Start with loaded best/initial

        with tqdm(total=total_stepwise_steps, desc="Stepwise", unit="run") as pbar:
            for param_name in self.config['optimization_order']:
                param_values = self.config['parameters'][param_name]
                # Store the best speed found *within this parameter's loop*
                best_speed_for_this_param = -1.0 
                best_value_for_this_param = current_best_for_stepwise[param_name]

                for value in param_values:
                    # Create candidate based on the best *so far* from previous params
                    candidate_params = current_best_for_stepwise.copy()
                    candidate_params[param_name] = value

                    # Check if this exact combination (only checking optimized params) exists
                    lookup_key = {p: candidate_params[p] for p in self.config['optimization_order'] if p in candidate_params}
                    
                    existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                    result_data = {}
                    if existing_result and 'error' not in existing_result:
                         print(f"{Style.DIM}Using cached result for {param_name}={value}{Style.RESET_ALL}")
                         result_data = existing_result
                    elif existing_result and 'error' in existing_result:
                         print(f"{Style.DIM}Cached result for {param_name}={value} has error, rerunning...{Style.RESET_ALL}")
                         # Optionally remove the errored entry?
                         # self.results.remove(existing_result)
                         result_data = self.run_benchmark(candidate_params)
                         entry_to_save = {**candidate_params, **result_data}
                         self.results.append(entry_to_save)
                         self.save_results()
                    else:
                        print(f"Running test for {param_name}={value}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results() # Save after each run

                    current_speed = result_data.get('tg_speed', 0.0)

                    # Update overall best if this run is better
                    if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                        self.best_params = {**candidate_params, **result_data} # Update global best
                        print(f"\n{Fore.GREEN}New Overall Best: {param_name}={value}, Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                        # Also update the baseline for the *next* parameter optimization step
                        current_best_for_stepwise = self.best_params.copy()
                        best_speed_for_this_param = current_speed # Update best for this param loop
                        best_value_for_this_param = value

                    # Track results per parameter value for combination phase
                    if 'error' not in result_data:
                         self.param_results[param_name].append((value, current_speed))
                    else:
                         # Log error or handle as needed, maybe append with 0 speed
                         self.param_results[param_name].append((value, 0.0))
                         print(f"{Fore.YELLOW}Error during run for {param_name}={value}: {result_data.get('error')}{Style.RESET_ALL}")


                    pbar.update(1)
                
                # After testing all values for a parameter, ensure the stepwise base uses the best value found for *that* parameter run
                current_best_for_stepwise[param_name] = best_value_for_this_param


        # --- Phase 2: Combination Testing ---
        print(f"\n{Fore.GREEN}=== Phase 2: Combination Testing (Top {self.config['combination_top_n']}) ===")
        combination_candidates = []
        valid_param_results_count = 0
        for param_name in self.config['optimization_order']:
             # Sort by speed (descending), filter out errors/zero speed if desired
             # Here, we keep all results to ensure 'combination_top_n' candidates exist if possible
             sorted_values = sorted(self.param_results[param_name], key=lambda item: item[1], reverse=True)
             # Take the top N *values* (not the tuples)
             top_n_values = [item[0] for item in sorted_values[:self.config['combination_top_n']]]
             if top_n_values: # Only append if there are results for this param
                 combination_candidates.append(top_n_values)
                 valid_param_results_count += 1
             else:
                  # Handle case where a parameter had no successful runs
                  print(f"{Fore.YELLOW}Warning: No successful results found for parameter '{param_name}' during Stepwise phase. Using its best known value for combinations.{Style.RESET_ALL}")
                  combination_candidates.append([self.best_params.get(param_name, self.config['parameters'][param_name][0])])


        # Check if we have enough params with results to combine
        if valid_param_results_count < 2:
             print(f"{Fore.YELLOW}Skipping Combination phase: Not enough parameters ({valid_param_results_count}) yielded successful results in Phase 1.{Style.RESET_ALL}")
        else:
            # Generate all combinations of the top N values for each parameter
            all_combinations = list(product(*combination_candidates))
            print(f"Testing {len(all_combinations)} combinations...")

            with tqdm(total=len(all_combinations), desc="Combinations", unit="run") as pbar:
                for i, combo_values in enumerate(all_combinations):
                    # Create the parameter dictionary for this combination
                    candidate_params = {param_name: combo_values[i] for i, param_name in enumerate(self.config['optimization_order'])}

                    # Check cache first
                    lookup_key = candidate_params # In combination, we check all params
                    existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                    result_data = {}
                    if existing_result and 'error' not in existing_result:
                        print(f"{Style.DIM}Using cached result for combination {i+1}{Style.RESET_ALL}")
                        result_data = existing_result
                    elif existing_result and 'error' in existing_result:
                        print(f"{Style.DIM}Cached combination {i+1} has error, rerunning...{Style.RESET_ALL}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results()
                    else:
                        print(f"Running combination {i+1}/{len(all_combinations)}: {candidate_params}")
                        result_data = self.run_benchmark(candidate_params)
                        entry_to_save = {**candidate_params, **result_data}
                        self.results.append(entry_to_save)
                        self.save_results()

                    # Update overall best if this combination is better
                    current_speed = result_data.get('tg_speed', 0.0)
                    if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                        self.best_params = {**candidate_params, **result_data} # Update global best
                        print(f"\n{Fore.GREEN}New Overall Best (Combination): Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                        # Display the new best combo
                        print(f"{Fore.GREEN} -> Params: {candidate_params}{Style.RESET_ALL}")

                    pbar.update(1)


        # --- Phase 3: Deep Parameter Search ---
        print(f"\n{Fore.GREEN}=== Phase 3: Deep Parameter Search (Around Best) ===")
        deep_search_candidates_map = {}
        params_for_deep_search = ['threads', 'batch_size', 'gpu_layers'] # Only search numerical params

        for param_name in self.config['optimization_order']:
            best_value = self.best_params.get(param_name) # Get current best value

            # Only perform deep search on specified numerical parameters that have a valid best value
            if param_name in params_for_deep_search and isinstance(best_value, (int, float)):
                original_values = self.config['parameters'][param_name]
                min_original_val = min(original_values) if original_values else 0
                max_original_val = max(original_values) if original_values else best_value # Fallback if list empty

                step_config = self.config['deep_search_params']
                # Calculate step size, ensure it's at least 1 for integers
                step = best_value * step_config['step_percentage']
                if isinstance(best_value, int):
                     step = max(1, int(step)) # Ensure integer step is at least 1

                search_values = {best_value} # Start with the current best

                # Generate values below the best
                current = best_value
                for _ in range(step_config['steps_around']):
                    current -= step
                    # Ensure value stays within reasonable bounds (>= min original or 1) and type
                    val_to_add = int(round(current)) if isinstance(best_value, int) else float(current)
                    if val_to_add >= max(1, min_original_val): # Lower bound check
                         search_values.add(val_to_add)
                    else:
                         break # Stop going lower if bounds exceeded

                # Generate values above the best
                current = best_value
                for _ in range(step_config['steps_around']):
                    current += step
                     # Ensure value stays within reasonable bounds (<= max original) and type
                    val_to_add = int(round(current)) if isinstance(best_value, int) else float(current)
                    if val_to_add <= max_original_val: # Upper bound check
                        search_values.add(val_to_add)
                    else:
                        break # Stop going higher if bounds exceeded

                # Sort and limit the number of values if needed (optional)
                # sorted_values = sorted(list(search_values))
                # deep_search_candidates_map[param_name] = sorted_values[:step_config['min_values']] # 'min_values' is maybe misnamed? Should be max_values?

                # Use all generated valid values
                deep_search_candidates_map[param_name] = sorted(list(search_values))
                print(f"Deep search values for {param_name}: {deep_search_candidates_map[param_name]}")

            else:
                # For non-numeric or non-searched params, just use the best known value
                 deep_search_candidates_map[param_name] = [best_value] if best_value is not None else [self.config['parameters'][param_name][0]]


        # Generate combinations for deep search
        deep_search_combinations = list(product(*deep_search_candidates_map.values()))
        print(f"Testing {len(deep_search_combinations)} deep search combinations...")

        with tqdm(total=len(deep_search_combinations), desc="Deep Search", unit="run") as pbar:
            for i, combo_values in enumerate(deep_search_combinations):
                # Create param dict, ensuring correct types if needed (esp. for threads/batch/ngl)
                candidate_params = {}
                param_order = list(deep_search_candidates_map.keys()) # Get order matching the product
                for idx, param_name in enumerate(param_order):
                     val = combo_values[idx]
                     # Ensure integer types for specific params
                     if param_name in ['threads', 'batch_size', 'gpu_layers', 'flash_attn', 'mmap', 'repetitions', 'poll', 'main_gpu']:
                          candidate_params[param_name] = int(val) if val is not None else 0
                     else:
                          candidate_params[param_name] = val


                # Check cache
                lookup_key = candidate_params
                existing_result = next((entry for entry in self.results if all(entry.get(k) == v for k, v in lookup_key.items() if k in entry)), None)

                result_data = {}
                if existing_result and 'error' not in existing_result:
                     print(f"{Style.DIM}Using cached result for deep search combination {i+1}{Style.RESET_ALL}")
                     result_data = existing_result
                elif existing_result and 'error' in existing_result:
                     print(f"{Style.DIM}Cached deep search combination {i+1} has error, rerunning...{Style.RESET_ALL}")
                     result_data = self.run_benchmark(candidate_params)
                     entry_to_save = {**candidate_params, **result_data}
                     self.results.append(entry_to_save)
                     self.save_results()
                else:
                    print(f"Running deep search {i+1}/{len(deep_search_combinations)}: {candidate_params}")
                    result_data = self.run_benchmark(candidate_params)
                    entry_to_save = {**candidate_params, **result_data}
                    self.results.append(entry_to_save)
                    self.save_results()

                # Update overall best
                current_speed = result_data.get('tg_speed', 0.0)
                if 'error' not in result_data and current_speed > self.best_params.get('tg_speed', 0.0):
                    self.best_params = {**candidate_params, **result_data}
                    print(f"\n{Fore.GREEN}New Overall Best (Deep Search): Speed: {current_speed:.2f} t/s{Style.RESET_ALL}")
                    print(f"{Fore.GREEN} -> Params: {candidate_params}{Style.RESET_ALL}")

                pbar.update(1)

        # --- Final Summary ---
        self.print_summary()

# --- Main Execution ---
if __name__ == "__main__":
    try:
        runner = BenchmarkRunner(CONFIG)
        runner.run()
    except KeyboardInterrupt:
        print("\nBenchmark interrupted by user.")
        # Optionally save results even on interrupt
        # if 'runner' in locals() and runner.results:
        #     print("Saving partial results...")
        #     runner.save_results()
        sys.exit(1)
    except Exception as main_e:
         print(f"{Fore.RED}An unexpected error occurred in the main execution: {main_e}{Style.RESET_ALL}")
         import traceback
         traceback.print_exc()
         sys.exit(1)
         
         

@vitorcalvi
Copy link
Author

Best Model for Financial Decision-Making: mlx-community/Qwen2.5-Coder-14B-Instruct-4bit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment