Last active
May 12, 2025 11:12
-
-
Save tadamcz/fa7a6c7a70010a044e83eab80d95366d to your computer and use it in GitHub Desktop.
Epoch AI implementation of SWE-Bench
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Based on [1] by @max-kaufmann as a starting point, with substantial modifications; not intended to | |
be a drop-in alternative. | |
[1] https://github.com/UKGovernmentBEIS/inspect_evals/tree/b30a1aab73217e035d5aa22fd0526c70650e4b3e/src/inspect_evals/swe_bench | |
""" | |
import json | |
import logging | |
import platform | |
from importlib.util import find_spec | |
from pathlib import Path | |
from typing import List, Optional | |
from datasets import Dataset # type: ignore | |
from environs import Env | |
from inspect_ai import task, Task | |
from inspect_ai.dataset import hf_dataset, FieldSpec | |
from inspect_ai.scorer import Scorer | |
from inspect_ai.util import SandboxEnvironmentSpec | |
from platformdirs import user_cache_dir | |
from bench.task.swe_bench.agent import swe_bench_agent, DEFAULT_BUNDLES | |
from bench.task.swe_bench.scorers import swe_bench_scorer | |
from bench.task.swe_bench.sweagent_tools import BundleName, generate_dockerfile_content | |
from bench.task.swe_bench.utils import warm_up_cache | |
COMPOSE_FILES_DIR = Path(user_cache_dir("inspect_swebench_eval")) / "compose_files /" | |
logger = logging.getLogger(__name__) | |
env = Env() | |
env.read_env() | |
@task | |
def swe_bench_verified() -> Task: | |
return swe_bench( | |
dataset="princeton-nlp/SWE-bench_Verified", | |
split="test", | |
) | |
@task | |
def swe_bench( | |
dataset: str = "princeton-nlp/SWE-bench_Verified", | |
split: str = "test", | |
instance_ids: list[str] | None = None, | |
scorer: Scorer | list[Scorer] | None = None, | |
) -> Task: | |
"""Returns a Task, representing an evaluation on SWE-bench. | |
This currently assumes the solver is a SWE-Agent solver (because the bundle names | |
need to be known at build time). However, it's easily extensible. | |
Args. | |
dataset : str | |
The dataset to use. This should either be the name of a dataset in the HF hub, or a path to a dataset on disk. | |
split : str | |
The split of the dataset to load. | |
instance_ids : list[str] | |
A list of instance_ids to filter the dataset by. If None, all instances are used. | |
scorer : Scorer | list[Scorer] | None | |
The scorer to use when evaluating swe_bench. If None, uses the default scorer. Mostly commonly, this will be a list of scorers to compare to baselines (see the README for more information). | |
""" | |
assert find_spec( | |
"swebench" | |
), "To run SWE-bench, please install the optional SWE-bench depency, by running `pip install inspect-ai[swe_bench]`" | |
samples = hf_dataset( | |
dataset, | |
split=split, | |
sample_fields=FieldSpec( | |
input="problem_statement", | |
id="instance_id", | |
metadata=[ | |
"base_commit", | |
"patch", | |
"PASS_TO_PASS", | |
"FAIL_TO_PASS", | |
"test_patch", | |
"version", | |
"repo", | |
"environment_setup_commit", | |
"hints_text", | |
"created_at", | |
], | |
), | |
# Shuffling helps avoid potential bias in case running a subset of the dataset | |
shuffle=True, | |
seed=12345, | |
) | |
for sample in samples: | |
# Turn the saved strings into list objects | |
sample.metadata = sample.metadata or {} | |
sample.metadata["PASS_TO_PASS"] = json.loads(sample.metadata["PASS_TO_PASS"]) | |
sample.metadata["FAIL_TO_PASS"] = json.loads(sample.metadata["FAIL_TO_PASS"]) | |
if instance_ids is not None: | |
samples = samples.filter(lambda x: x.id in instance_ids) | |
ids_to_docker_image = get_image_names(samples) | |
swe_agent_bundles = DEFAULT_BUNDLES | |
for sample in samples: | |
sample.metadata = sample.metadata or {} | |
sample.sandbox = SandboxEnvironmentSpec( | |
type="docker", | |
config=get_compose_file(ids_to_docker_image[str(sample.id)], swe_agent_bundles), | |
) | |
sample.metadata["image"] = { | |
"name": ids_to_docker_image[str(sample.id)], | |
# I would like to also have the image digest in the metadata, | |
# but this is better handled in Inspect, where the image | |
# is actually pulled. | |
} | |
# TODO: | |
# this currently ignores EVAL_LIMIT and warms up the cache for all images. (We face a similar | |
# problem with all overrides done at Task instantiation time, e.g. `epochs_override`) | |
# This is acceptable for now. | |
# Hack to parallelize image construction (only relevant for non-registry images/layers like | |
# SWE-Agent), see https://github.com/epoch-research/benchmarks/issues/141 for details. | |
warm_up_cache(list(samples)) | |
return Task( | |
dataset=samples, | |
plan=swe_bench_agent(bundles=swe_agent_bundles), | |
scorer=scorer or swe_bench_scorer(), | |
epochs=1, | |
metadata={"inspect-log-public": True}, | |
) | |
def get_compose_file(image_name: str, swe_agent_bundles: Optional[List[BundleName]] = None) -> str: | |
""" | |
Create a Docker Compose file for the given image. | |
Args: | |
image_name (str): The Docker image name to use. | |
swe_agent_bundles (Optional[List[BundleName]]): List of SWE-Agent bundle names to include. | |
If None, SWE-Agent is not used. | |
""" | |
# Create safe filenames without special characters | |
base_filename = f"{image_name}".replace("/", "_").replace(":", "_") | |
COMPOSE_FILES_DIR.mkdir(parents=True, exist_ok=True) | |
# Create a multistage Dockerfile that clones SWE-Agent and installs necessary tools | |
dockerfile_name = f"{base_filename}.Dockerfile" | |
dockerfile_path = COMPOSE_FILES_DIR / dockerfile_name | |
# If bundles is None, just create a simple Dockerfile with the FROM line | |
if swe_agent_bundles is None: | |
dockerfile_content = f"FROM {image_name}" | |
else: | |
# other bundles require the "registry" bundle to be included | |
registry_bundle: BundleName = "registry" | |
swe_agent_bundles = [registry_bundle] + swe_agent_bundles | |
dockerfile_content = generate_dockerfile_content(image_name, swe_agent_bundles) | |
with dockerfile_path.open(mode="w+") as f: | |
f.write(dockerfile_content) | |
# Create the compose file that references the Dockerfile | |
compose_filename = f"{base_filename}.yaml" | |
image_compose_file = COMPOSE_FILES_DIR / compose_filename | |
with image_compose_file.open(mode="w+") as f: | |
f.write( | |
f"""services: | |
default: | |
build: | |
dockerfile: {dockerfile_name} | |
command: "sleep infinity" | |
working_dir: /testbed | |
network_mode: none | |
init: true""" | |
) | |
return str(image_compose_file) | |
def get_image_names(samples: Dataset) -> dict[str, str]: | |
ids_to_docker_image = {} | |
for instance in samples: | |
if platform.machine() in ["x86_64", "AMD64"]: | |
arch = "x86_64" | |
elif platform.machine() in ["aarch64", "arm64"]: | |
arch = "arm64" | |
logger.warning( | |
"Using arm64 architecture. arm64 support is experimental. Some images may not exist, or may not work." | |
) | |
image_name = f"ghcr.io/epoch-research/swe-bench.eval.{arch}.{instance.id}" | |
ids_to_docker_image[instance.id] = image_name | |
return ids_to_docker_image |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
SWE-Bench agent implementation using SWE-Agent tools. | |
""" | |
import logging | |
from typing import List, Dict | |
from inspect_ai.model import ChatMessageUser | |
from inspect_ai.solver import solver, TaskState, Generate, Solver | |
from inspect_ai.tool import Tool, bash, ToolDef | |
from inspect_ai.util import store | |
from inspect_ai.tool import tool | |
from inspect_ai.log._transcript import transcript | |
from .sweagent_tools import ( | |
sweagent_tooldefs, | |
setup_sweagent_environment, | |
SandboxStoreState, | |
BundleName, | |
) | |
logger = logging.getLogger(__name__) | |
BASH_TOOL_TIMEOUT = 60 * 5 | |
# List of SWE-Agent bundles to include by default | |
# Note: make sure not to include tools that are redundant with the other tools we | |
# provide in the solver (e.g. submit_answer, bash). | |
# Note: There are three edit tool bundles: edit_replace, edit_linting, edit_rewrite. | |
# Only include one of these at a time. | |
# Defaults are taken from the SWE-Agent default config: | |
# https://github.com/SWE-agent/SWE-agent/blob/ce072d61a5d1cd4fc60eb1f281526fdf0bde1640/config/default.yaml | |
DEFAULT_BUNDLES: List[BundleName] = [ | |
"defaults", # Core file navigation: goto, open, create, scroll_up, scroll_down | |
"search", # File search tools: find_file, search_dir, search_file | |
"edit_replace", # Search and replace editing with optional replace-all flag | |
] | |
# We skip these bundles: | |
# - diff_state: Shows differences in state variables between commands | |
# - edit_anthropic: Anthropic-specific Claude editing tool (str_replace_editor) | |
# - edit_linting: Line range editing (replaces lines start_line:end_line with new text) | |
# - edit_rewrite: Complete text replacement for currently visible lines | |
# - filemap: Python file summarization, skipping lengthy function definitions | |
# - forfeit: Task-specific for challenges (exit_forfeit command) | |
# - review_on_submit/review_on_submit_m: Multi-stage submission with review steps | |
# - submit: Redundant with our submit_answer tool | |
# The "registry" bundle is required by other bundles and is always included | |
# in the image build process. | |
@solver | |
def swe_bench_agent( | |
token_limit: int = 1_000_000, | |
bundles: List[BundleName] | None = None, | |
window: int = 100, | |
overlap: int = 2, | |
) -> Solver: | |
""" | |
A solver that uses SWE-Agent tools to solve SWE-Bench tasks. | |
Args: | |
token_limit: Maximum number of tokens to use for the conversation. | |
bundles: List of SWE-Agent bundle names to include. | |
If None, uses a default set of bundles. | |
window: Size of the file viewing window (number of lines). | |
overlap: Number of lines to overlap when scrolling. | |
Returns: | |
A solver that uses SWE-Agent tools. | |
""" | |
include_bundles = bundles or DEFAULT_BUNDLES | |
async def solve(state: TaskState, generate: Generate) -> TaskState: | |
# Initialize submitted answer in store | |
store().set("submitted_answer", None) | |
# Set token limit on state, so generate() will respect it | |
state.token_limit = token_limit | |
# Load SWE-Agent tools | |
# All bundles are installed at build time | |
await setup_sweagent_environment(window=window, overlap=overlap) | |
swe_agent_tools = await sweagent_tooldefs( | |
include_bundles=include_bundles, | |
) | |
# Flatten tool list for state.tools | |
all_tools = [] | |
for bundle_tools in swe_agent_tools.values(): | |
# Convert ToolDefs to Tool instances | |
as_tools = [tool_def.as_tool() for tool_def in bundle_tools] | |
all_tools.extend(as_tools) | |
# Always include the bash tool and submit_answer tool | |
state.tools = [bash(timeout=BASH_TOOL_TIMEOUT), submit_answer()] + all_tools | |
state.tool_choice = "auto" | |
# Display initial prompt with instructions | |
state.messages = [ | |
ChatMessageUser( | |
content=initial_prompt( | |
question=state.user_prompt.text, | |
token_limit=token_limit, | |
repo=state.metadata["repo"], | |
swe_agent_tools=swe_agent_tools, | |
window_size=window, | |
overlap_size=overlap, | |
), | |
), | |
] | |
while state.token_usage < token_limit: | |
state = await generate(state, tool_calls="loop") | |
# If an answer was submitted, we're done | |
if store().get("submitted_answer") is not None: | |
break | |
# Otherwise, continue the conversation | |
message = ( | |
f"You have used {state.token_usage:,} tokens (hard limit: {token_limit:,} tokens). " | |
f"Continue working on the problem. Once you've made the necessary changes to the " | |
f"repository, submit your answer using the `submit_answer` tool." | |
) | |
state.messages.append(ChatMessageUser(content=message)) | |
return state | |
return solve | |
@tool | |
def submit_answer() -> Tool: | |
async def execute() -> str: | |
""" | |
Call this tool to signal that you have finished making changes to the repository to solve | |
the task. The tool does not take any arguments. | |
""" | |
store().set("submitted_answer", True) | |
return "Your answer has been recorded. No feedback is provided." | |
return execute | |
def initial_prompt( | |
question: str, | |
token_limit: int, | |
repo: str, | |
swe_agent_tools: Dict[BundleName, List[ToolDef]], | |
window_size: int, | |
overlap_size: int, | |
) -> str: | |
"""Generate the initial prompt with dynamic tool descriptions based on the actual loaded tools.""" | |
# Create tool descriptions section using bundles directly | |
tool_descriptions = "" | |
for bundle, tools in swe_agent_tools.items(): | |
tool_names_str = ", ".join(f"`{tool.name}`" for tool in sorted(tools, key=lambda t: t.name)) | |
tool_descriptions += f"\n - {bundle}: {tool_names_str}" | |
return TEMPLATE.format( | |
question=question, | |
token_limit=token_limit, | |
repo=repo, | |
bash_tool_timeout=BASH_TOOL_TIMEOUT, | |
tool_descriptions=tool_descriptions, | |
window_size=window_size, | |
overlap_size=overlap_size, | |
) | |
TEMPLATE = r""" | |
You will be solving an open issue from the {repo} repository. You have access to a Python environment | |
containing the repository, and you will need to modify the code to solve the issue. | |
When you are done, call the `submit_answer` tool to let us know. | |
* Environment: | |
- You have access to a Python environment containing the repository. You interact with the | |
environment by calling various tools. | |
- The environment does not have internet access. | |
* Token limits: | |
- There is a hard limit of {token_limit:,} tokens. If you exceed this, the conversation will end | |
immediately (even if you haven't called the submit tool). The final state of the repository at | |
that point will be used as your answer. | |
- Both input and output tokens count towards the limit. | |
* SWE-Agent Tools: | |
- Available tools by category: {tool_descriptions} | |
- Additional guidance on the file viewing and editing tools: | |
- The file viewing and editing tools let you view {window_size} lines at a time with an overlap of {overlap_size} lines. | |
- You can use the `scroll_up` and `scroll_down` commands to navigate through larger files. | |
- Avoid using the scrolling commands multiple times. Instead, use the `goto` command to jump to a specific line, or | |
open the file at a specific line. | |
* You also have access to the `bash` tool: | |
- You can use the tool to execute arbitrary shell commands in the repository environment. | |
- The tool has a timeout of {bash_tool_timeout} seconds. | |
* The `submit_answer` tool takes no arguments and simply records that you have finished making changes to the repository. | |
Here is the issue you need to solve: | |
<issue> | |
{question} | |
</issue> | |
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
import re | |
import shlex | |
from collections import Counter | |
from pathlib import Path | |
from textwrap import dedent | |
import jsonlines | |
from inspect_ai.log import EvalLog | |
from inspect_ai.scorer import Score, Scorer, Target, mean, scorer, std, INCORRECT, CORRECT | |
from inspect_ai.solver import TaskState | |
from inspect_ai.util import ExecResult, sandbox | |
from inspect_ai.scorer import accuracy, stderr | |
logger = logging.getLogger(__name__) | |
# 30 minutes is hopefully very conservative, but I haven't had time for a thorough investigation | |
# of how long the tests should legitimately take to run, and I really don't want to unduly mark | |
# as INCORRECT because of timeouts. | |
EVAL_SCRIPT_TIMEOUT = 60 * 30 | |
@scorer(metrics=[accuracy(), stderr()]) | |
def swe_bench_scorer() -> Scorer: | |
"""Scores the changes made by a solver when solving a swe-bench instance, by running the tests which check whether than instance is correct.""" | |
async def scorer(state: TaskState, target: Target) -> Score: | |
# Get the changes the model made, for logging purposes | |
await sandbox().exec( | |
[ | |
"bash", | |
"-c", | |
CREATE_MODEL_PATCH.format(base_commit=state.metadata["base_commit"]), | |
] | |
) | |
try: | |
agent_patch = await sandbox().exec(["bash", "-c", GET_AGENT_PATCH]) | |
except UnicodeDecodeError: | |
agent_patch = ExecResult( | |
True, | |
0, | |
"Agent patch could not be decoded due to having a binary input.", | |
"", | |
) | |
# Run the evaluation script | |
eval_script = get_eval_script( | |
test_patch=state.metadata["test_patch"], | |
repo=state.metadata["repo"], | |
version=state.metadata["version"], | |
base_commit=state.metadata["base_commit"], | |
) | |
logger.debug(f"Running eval script") | |
try: | |
eval_script_result = await sandbox().exec( | |
["bash", "-c", eval_script], timeout=EVAL_SCRIPT_TIMEOUT | |
) | |
except TimeoutError as e: | |
explanation = f"The evaluation script timed out after {EVAL_SCRIPT_TIMEOUT} seconds." | |
return Score( | |
value=INCORRECT, | |
explanation=explanation, | |
metadata={ | |
"model_patch": agent_patch.stdout, | |
"eval_script": eval_script, | |
"eval_script_timeout": str(e), | |
}, | |
) | |
if not eval_script_result.success: | |
raise RuntimeError( | |
f"Test run failed. \n\nStderr: \n\n{eval_script_result.stderr}\n\nStdout: \n\n{eval_script_result.stdout}" | |
) | |
value, explanation, pass_to_pass_results, fail_to_pass_results = parse_test_output( | |
eval_script_result.stdout + "\n" + eval_script_result.stderr, state | |
) | |
return Score( | |
value=value, | |
explanation=explanation, | |
metadata={ | |
"model_patch": agent_patch.stdout, | |
"eval_script": eval_script, | |
"test_results": { | |
"pass_to_pass": pass_to_pass_results, | |
"fail_to_pass": fail_to_pass_results, | |
}, | |
"eval_script_result": eval_script_result, | |
}, | |
) | |
return scorer | |
def parse_test_output(test_output: str, state: TaskState) -> tuple[str, str, dict, dict]: | |
# This function looks at the output of running the tests, and returns a score and an explanation of the results | |
# Import these here as swebench is an optional dependency | |
from swebench.harness.constants import ( # type: ignore | |
APPLY_PATCH_FAIL, | |
RESET_FAILED, | |
TESTS_ERROR, | |
TESTS_TIMEOUT, | |
) | |
from swebench.harness.grading import MAP_REPO_TO_PARSER # type: ignore | |
# Search for the error strings defined by the swe-bench authors | |
error_string_search = { | |
x: x in test_output | |
for x in [ | |
APPLY_PATCH_FAIL, | |
RESET_FAILED, | |
TESTS_ERROR, | |
TESTS_TIMEOUT, | |
"Failed to reset task environment", | |
] | |
} | |
if any(error_string_search.values()): | |
empty_results: tuple[dict, dict] = {}, {} | |
return ( | |
INCORRECT, | |
f"The tests did not run correctly. Output from searching for error strings:\n\n{error_string_search}\n\nOutput from tests:\n\n{test_output}", | |
*empty_results, | |
) | |
test_output_parser = MAP_REPO_TO_PARSER[state.metadata["repo"]] | |
test_output_parsed = test_output_parser(test_output) | |
pass_to_pass_results = {k: "FAILED" for k in state.metadata["PASS_TO_PASS"]} | |
fail_to_pass_results = {k: "FAILED" for k in state.metadata["FAIL_TO_PASS"]} | |
for k, v in test_output_parsed.items(): | |
if k in state.metadata["PASS_TO_PASS"]: | |
pass_to_pass_results[k] = v | |
elif k in state.metadata["FAIL_TO_PASS"]: | |
fail_to_pass_results[k] = v | |
passed_all_tests = all(["PASSED" == v for v in pass_to_pass_results.values()]) and all( | |
["PASSED" == v for v in fail_to_pass_results.values()] | |
) | |
value = CORRECT if passed_all_tests else INCORRECT | |
# Sort both so the the false values are at the top | |
pass_to_pass_results, fail_to_pass_results = ( | |
dict(sorted(pass_to_pass_results.items(), key=lambda x: x[1] == "PASSED")), | |
dict(sorted(fail_to_pass_results.items(), key=lambda x: x[1] == "PASSED")), | |
) | |
p2p_counts = Counter(pass_to_pass_results.values()) | |
f2p_counts = Counter(fail_to_pass_results.values()) | |
# Create markdown lists for the counts | |
p2p_counts_str = "\n".join([f"* {k}: {v}" for k, v in p2p_counts.items()]) | |
f2p_counts_str = "\n".join([f"* {k}: {v}" for k, v in f2p_counts.items()]) | |
# Create an explanation of the results | |
explanation_lines = [] | |
explanation_lines.append(f"Pass-to-pass tests:\n{p2p_counts_str}") | |
explanation_lines.append("\n") | |
explanation_lines.append(f"Fail-to-pass tests:\n{f2p_counts_str}") | |
explanation = "\n".join(explanation_lines) | |
return value, explanation, pass_to_pass_results, fail_to_pass_results | |
def swe_bench_baseline_scorer(path_to_baseline: str, name: str | None = None) -> Scorer: | |
"""Given a path to a set of SWE-bench trajectories in the official format (see https://github.com/swe-bench/experiments), returns the performance of those trajectories on the subset of SWE-bench you are evaluating on. This lets you compare to baselines on arbitrary subsets of SWE-bench.""" | |
baseline_name = name if name else Path(path_to_baseline).name | |
results_per_instance_id = get_baseline_results(path_to_baseline) | |
@scorer(metrics=[mean(), std()], name=baseline_name) | |
def _swebench_baseline_scorer() -> Scorer: | |
async def scorer(state: TaskState, target: Target) -> Score: | |
if state.sample_id in results_per_instance_id: | |
results = results_per_instance_id[str(state.sample_id)] | |
return Score( | |
value=results["resolved"], | |
explanation=f"Model Patch:\n\n {results['patch']}", | |
) | |
else: | |
return Score(value="N", explanation="No baseline found for this instance") | |
return scorer | |
return _swebench_baseline_scorer() | |
CREATE_MODEL_PATCH = """cd /testbed | |
git add -A | |
git diff --cached {base_commit} > model.patch""" | |
GET_AGENT_PATCH = """cd /testbed/ | |
cat model.patch""" | |
def get_eval_script(test_patch: str, repo: str, version: str, base_commit: str) -> str: | |
"""Creates a script which runs the tests of all the files in the test_patch.""" | |
# First we fetch the repository-specific 'specification' which SWE-bench provides | |
from swebench.harness.constants import MAP_REPO_VERSION_TO_SPECS | |
from swebench.harness.utils import get_test_directives # type: ignore | |
# Fetch the command which runs the test. Often simply the string 'pytest' | |
test_command = MAP_REPO_VERSION_TO_SPECS[repo][version]["test_cmd"] | |
# Find all the files which have been modified by the test patch | |
test_patch_files = re.findall(r"--- a/(.*)", test_patch) | |
# Find all the files which contain tests. Ugly interface is due to swebench | |
test_files = get_test_directives({"repo": repo, "test_patch": test_patch}) | |
# Reset test files to the state they should be in before the patch. | |
eval_script = f"""#!/bin/bash | |
set -uo pipefail -x | |
source ~/.bashrc | |
# First we reset all of the files which out test patch touches | |
git checkout {base_commit} {' '.join(test_patch_files)} | |
#Then we apply the test patch given to us by SWE-bench, setting up the test we need to run | |
echo {shlex.quote(test_patch)} > /tmp/test_patch.diff | |
git apply --check /tmp/test_patch.diff | |
git apply /tmp/test_patch.diff | |
#Then we run all the tests in the repository. | |
set +x | |
{test_command} {" ".join(test_files)} || true | |
""" | |
return eval_script | |
def get_baseline_results(path_to_baseline: str) -> dict[str, dict[str, str]]: | |
"""Loads the results of a SWE-bench baseline in the offical format, and returns a dictionary of the results.""" | |
path_to_logs = os.path.join(path_to_baseline, "logs") | |
results_per_instance_id = {} | |
for result in os.listdir(path_to_logs): | |
results_path = os.path.join(path_to_logs, result, "report.json") | |
patch_path = os.path.join(path_to_logs, result, "patch.diff") | |
if os.path.exists(results_path) and os.path.exists(patch_path): | |
# Sometimes there is no result saved, at which point we ignore that entry | |
with open(results_path, "r") as f: | |
result_dict = json.load(f) | |
instance_id, raw_results = next(iter(result_dict.items())) | |
with open(patch_path, "r") as f: | |
results_per_instance_id[instance_id] = { | |
"resolved": raw_results["resolved"], | |
"patch": f.read(), | |
"tests_status": ( | |
raw_results["tests_status"] | |
if "tests_status" in raw_results | |
else "NO STATUS REPORTED" | |
), | |
} | |
return results_per_instance_id | |
def save_outputs_to_swebench_format( | |
eval_logs: list[EvalLog] | EvalLog, | |
output_dir: str | Path, | |
print_instance_ids: bool = True, | |
) -> None: | |
"""Converts the outputs of the swe_bench_scorer into the format expected by the swe_bench_baseline_scorer""" | |
output_dir = Path(output_dir) | |
eval_logs = eval_logs if isinstance(eval_logs, list) else [eval_logs] | |
os.makedirs(output_dir, exist_ok=True) | |
for log in eval_logs: | |
log_name = f"{log.eval.created}+{log.eval.task}_{log.eval.run_id}" | |
log_name = ( | |
log_name.replace("_", "-").replace("/", "-").replace(":", "-") | |
) # Mirrors the name convention of the inspect log files | |
preds = [] | |
if log.samples is None: | |
raise ValueError(f"The eval log {log_name} does not contain any samples.") | |
for sample in log.samples: | |
preds += [ | |
{ | |
"model_name_or_path": log_name, | |
"model_patch": sample.scores["swe_bench_scorer"].metadata[ # type: ignore | |
"model_patch" | |
], | |
"instance_id": sample.id, | |
} | |
] | |
output_file = output_dir / f"{log_name}.jsonl" | |
jsonlines.open(output_file, "w").write_all(preds) | |
print( | |
f"""Log saved. Run evaluation with: | |
python -m swebench.harness.run_evaluation \\ | |
--predictions_path {output_file} \\ | |
--dataset princeton-nlp/SWE-bench_Verified \\ | |
--max_workers 8 \\ | |
--run_id check-outputs\\ | |
--instance_ids {' '.join([str(sample.id) for sample in log.samples]) if print_instance_ids else "INSTANCE_IDS"}""" | |
) | |
print(f"Saved the outputs of the scorers to {output_dir}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Integration of SWE-Agent tools with Inspect's ToolDef framework. | |
This module provides a bridge between SWE-Agent tools and Inspect by: | |
1. Loading SWE-Agent tool bundles | |
2. Adapting Inspect's SandboxEnvironment to SWE-Agent's SWEEnv interface | |
3. Converting SWE-Agent Command objects to Inspect ToolDef objects | |
""" | |
import asyncio | |
import json | |
import os | |
import re | |
import shlex | |
import sys | |
from pathlib import Path | |
from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Literal | |
from inspect_ai.tool import ToolDef, ToolParams, ToolParam | |
from inspect_ai.util import sandbox, store_as | |
from inspect_ai.util._store_model import StoreModel | |
from pydantic import Field | |
import logging | |
from bench import PROJECT_ROOT | |
from environs import Env | |
env = Env() | |
env.read_env() | |
logger = logging.getLogger(__name__) | |
SWE_AGENT_PATH = Path(env.str("SWE_AGENT_PATH")) | |
# TODO: I couldn't get `mypy` to run on the SWE-Agent imports | |
from sweagent.environment.swe_env import SWEEnv # type: ignore | |
from sweagent.tools.bundle import Bundle # type: ignore | |
from sweagent.tools.commands import Command # type: ignore | |
# Define Literal type for available bundles | |
BundleName = Literal[ | |
"defaults", # Core file navigation | |
"search", # File search tools | |
"edit_replace", # Search and replace editing | |
"edit_linting", # Line range editing | |
"edit_rewrite", # Complete text replacement | |
"edit_anthropic", # Anthropic-specific editing | |
"diff_state", # State variable diff | |
"filemap", # Python file summarization | |
"forfeit", # Task-specific for challenges | |
"registry", # Environment variables infrastructure | |
"review_on_submit", # Multi-stage submission | |
"review_on_submit_m", # Multi-stage submission | |
"submit", # Submission tool | |
] | |
class SandboxStoreState(StoreModel): | |
"""State for a sandbox environment used by SWE-Agent tools.""" | |
current_file: Optional[str] = Field(default=None) | |
first_line: int = Field(default=0) | |
class InspectSandboxEnvAdapter(SWEEnv): | |
"""Adapts Inspect's SandboxEnvironment to SWE-Agent's SWEEnv interface.""" | |
def __init__(self, sandbox_name=None): | |
self._sandbox = sandbox(sandbox_name) | |
self._env_vars = {} | |
def communicate(self, command, timeout=None, check=None): | |
"""Execute a command in the sandbox and return its output.""" | |
result = asyncio.run( | |
self._sandbox.exec(cmd=["bash", "--login", "-c", command], timeout=timeout) | |
) | |
if check == "raise" and not result.success: | |
raise RuntimeError(f"Command failed: {command}") | |
return result.stdout | |
def set_env_variables(self, env_vars): | |
"""Set environment variables for the sandbox.""" | |
self._env_vars.update(env_vars) | |
def read_file(self, path): | |
"""Read a file from the sandbox.""" | |
return asyncio.run(self._sandbox.read_file(path)) | |
def write_file(self, path, content): | |
"""Write a file to the sandbox.""" | |
return asyncio.run(self._sandbox.write_file(path, content)) | |
def command_to_tool_def(command: Command, bundle_path: Path) -> ToolDef: | |
"""Convert a SWE-Agent Command to an Inspect ToolDef. | |
Args: | |
command: The SWE-Agent Command to convert. | |
bundle_path: The path to the bundle containing the command. | |
""" | |
# Convert SWE-Agent arguments to Inspect ToolParams | |
params = ToolParams() | |
for arg in command.arguments: | |
param_type = arg.type | |
# Convert SWE-Agent types to Inspect types | |
if param_type == "integer": | |
param_type = "number" | |
# Keep string, boolean, array, object as-is | |
# Ensure both type and description are valid to pass validation | |
params.properties[arg.name] = ToolParam( | |
type=param_type or "string", # Default to string if type is None | |
description=arg.description | |
or f"Parameter {arg.name} for {command.name}", # Ensure description is not None | |
) | |
# Add to required list if the argument is required | |
if arg.required: | |
if params.required is None: | |
params.required = [] | |
params.required.append(arg.name) | |
async def execute_tool(**kwargs: Any) -> str: | |
"""Execute the SWE-Agent tool in the sandbox environment.""" | |
logger.debug(f"Tool called: {command.name}") | |
logger.debug(f"Arguments received: {kwargs}") | |
# Get state for this sandbox | |
state = store_as(SandboxStoreState) | |
# Create arguments list in the correct order | |
args_list = [] | |
for arg in command.arguments: | |
if arg.name in kwargs: | |
arg_value = kwargs[arg.name] | |
if isinstance(arg_value, bool): | |
if arg_value: | |
args_list.append(str(arg_value).lower()) | |
else: | |
args_list.append(str(arg_value)) | |
quoted_args = [shlex.quote(str(arg)) for arg in args_list] | |
cmd_str = f"{command.name} {' '.join(quoted_args)}" | |
logger.debug(f"Executing command: {cmd_str}") | |
# Set up tool environment paths | |
setup_cmd = ( | |
f"export PATH=$PATH:/root/tools/{bundle_path.name}/bin:/root/tools/defaults/bin:/root/tools/registry/bin && " | |
f"export PYTHONPATH=$PYTHONPATH:/root/tools/{bundle_path.name}/lib:/root/tools/defaults/lib:/root/tools/registry/lib" | |
) | |
# Execute the command | |
result = await sandbox().exec( | |
cmd=["bash", "--login", "-c", f"{setup_cmd} && {cmd_str}"], | |
timeout=120, | |
) | |
logger.debug(f"Command result: {result}") | |
# Read the registry to update our state | |
registry_content = await sandbox().read_file("/root/.swe-agent-env") | |
updated_registry = json.loads(registry_content) | |
# Update state with changes made by the tool | |
if "CURRENT_FILE" in updated_registry: | |
state.current_file = updated_registry["CURRENT_FILE"] | |
if "FIRST_LINE" in updated_registry: | |
state.first_line = int(updated_registry["FIRST_LINE"]) | |
return result.stdout | |
# Create the ToolDef | |
tool_name = command.name | |
tool_description = command.docstring or f"Run the {command.name} command" | |
return ToolDef( | |
tool=execute_tool, name=tool_name, description=tool_description, parameters=params | |
) | |
async def setup_sweagent_environment(window: int = 100, overlap: int = 2) -> None: | |
"""Set up the SWE-Agent environment in the sandbox. | |
Args: | |
window: Size of the file viewing window (number of lines). | |
overlap: Number of lines to overlap when scrolling. | |
""" | |
# Initialize registry file with default values | |
registry_content = { | |
"WINDOW": str(window), | |
"OVERLAP": str(overlap), | |
} | |
await sandbox().write_file("/root/.swe-agent-env", json.dumps(registry_content)) | |
def raise_if_bundle_dir_invalid(bundle_dir: Path, bundle_name: str) -> None: | |
if not bundle_dir.is_dir(): | |
raise ValueError(f"Path {bundle_dir} for bundle {bundle_name} is not a directory") | |
if not (bundle_dir / "config.yaml").exists(): | |
raise ValueError( | |
f"Directory {bundle_dir} for bundle {bundle_name} does not contain a config.yaml file" | |
) | |
def generate_dockerfile_content(image_name: str, bundles: List[BundleName]) -> str: | |
"""Generate Dockerfile content for SWE-Agent environment. | |
Args: | |
image_name: The base Docker image name to use. | |
bundles: List of SWE-Agent bundles to include. | |
Returns: | |
String containing the Dockerfile content. | |
""" | |
# Keep SWE_AGENT_BRANCH in sync with the version in benchmarks/Dockerfile | |
SWE_AGENT_URL = "https://github.com/SWE-agent/SWE-agent.git" | |
SWE_AGENT_BRANCH = "v1.0.1" | |
dockerfile_content = f"""# Stage 1: Clone SWE-Agent repository | |
FROM alpine/git as swe-agent-source | |
RUN git clone --depth 1 --branch {SWE_AGENT_BRANCH} {SWE_AGENT_URL} /swe-agent | |
# Stage 2: Build final image | |
FROM {image_name} | |
# Install packages required by the SWE-Agent tools | |
RUN --mount=type=cache,target=/root/.cache/pip \ | |
pip install flake8 | |
# Create directory structure for SWE-Agent tools | |
RUN mkdir -p /root/tools | |
""" | |
# Copy each specified bundle | |
for bundle in bundles: | |
dockerfile_content += f""" | |
# Copy {bundle} bundle | |
COPY --from=swe-agent-source /swe-agent/tools/{bundle} /root/tools/{bundle} | |
""" | |
# Setup registry environment | |
dockerfile_content += """ | |
# Initialize registry file | |
RUN echo "{}" > /root/.swe-agent-env | |
# Make executable files in bin directories executable | |
RUN find /root/tools -path "*/bin/*" -type f -exec chmod +x {} \\; | |
""" | |
return dockerfile_content | |
async def sweagent_tooldefs( | |
include_bundles: List[BundleName], | |
tools_dir=None, | |
) -> Dict[BundleName, List[ToolDef]]: | |
"""Convert SWE-Agent tools to Inspect ToolDefs. | |
Args: | |
include_bundles: List of bundle names to include as ToolDefs. | |
window: Size of the file viewing window (number of lines). | |
overlap: Number of lines to overlap when scrolling. | |
tools_dir: Directory containing SWE-Agent tools bundles. | |
Defaults to SWE-agent/tools. | |
Returns: | |
Dictionary mapping bundle names to lists of ToolDef objects for that bundle. | |
""" | |
if tools_dir is None: | |
tools_dir = SWE_AGENT_PATH / "tools" | |
swe_agent_tools: Dict[BundleName, List[ToolDef]] = {} | |
tool_names = set() | |
# Load SWE-Agent bundles for ToolDefs - all bundles are installed in the Docker image at build time | |
for bundle_name in include_bundles: | |
bundle_dir = tools_dir / bundle_name | |
raise_if_bundle_dir_invalid(bundle_dir, bundle_name) | |
bundle = Bundle(path=bundle_dir) | |
# Initialize list for this bundle | |
swe_agent_tools[bundle_name] = [] | |
# Create tool definitions | |
for command in bundle.commands: | |
tool_def = command_to_tool_def(command, bundle_dir) | |
# Check for duplicate tool names | |
if tool_def.name in tool_names: | |
raise ValueError( | |
f"Duplicate tool name detected: {tool_def.name}. Remove one of the bundles that provides this tool." | |
) | |
tool_names.add(tool_def.name) | |
swe_agent_tools[bundle_name].append(tool_def) | |
logger.debug( | |
f"Loaded tools from {len(swe_agent_tools)} bundles: {list(swe_agent_tools.keys())}" | |
) | |
return swe_agent_tools |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment