Skip to content

Instantly share code, notes, and snippets.

@tadamcz
Last active May 12, 2025 11:12
Show Gist options
  • Save tadamcz/fa7a6c7a70010a044e83eab80d95366d to your computer and use it in GitHub Desktop.
Save tadamcz/fa7a6c7a70010a044e83eab80d95366d to your computer and use it in GitHub Desktop.
Epoch AI implementation of SWE-Bench
"""
Based on [1] by @max-kaufmann as a starting point, with substantial modifications; not intended to
be a drop-in alternative.
[1] https://github.com/UKGovernmentBEIS/inspect_evals/tree/b30a1aab73217e035d5aa22fd0526c70650e4b3e/src/inspect_evals/swe_bench
"""
import json
import logging
import platform
from importlib.util import find_spec
from pathlib import Path
from typing import List, Optional
from datasets import Dataset # type: ignore
from environs import Env
from inspect_ai import task, Task
from inspect_ai.dataset import hf_dataset, FieldSpec
from inspect_ai.scorer import Scorer
from inspect_ai.util import SandboxEnvironmentSpec
from platformdirs import user_cache_dir
from bench.task.swe_bench.agent import swe_bench_agent, DEFAULT_BUNDLES
from bench.task.swe_bench.scorers import swe_bench_scorer
from bench.task.swe_bench.sweagent_tools import BundleName, generate_dockerfile_content
from bench.task.swe_bench.utils import warm_up_cache
COMPOSE_FILES_DIR = Path(user_cache_dir("inspect_swebench_eval")) / "compose_files /"
logger = logging.getLogger(__name__)
env = Env()
env.read_env()
@task
def swe_bench_verified() -> Task:
return swe_bench(
dataset="princeton-nlp/SWE-bench_Verified",
split="test",
)
@task
def swe_bench(
dataset: str = "princeton-nlp/SWE-bench_Verified",
split: str = "test",
instance_ids: list[str] | None = None,
scorer: Scorer | list[Scorer] | None = None,
) -> Task:
"""Returns a Task, representing an evaluation on SWE-bench.
This currently assumes the solver is a SWE-Agent solver (because the bundle names
need to be known at build time). However, it's easily extensible.
Args.
dataset : str
The dataset to use. This should either be the name of a dataset in the HF hub, or a path to a dataset on disk.
split : str
The split of the dataset to load.
instance_ids : list[str]
A list of instance_ids to filter the dataset by. If None, all instances are used.
scorer : Scorer | list[Scorer] | None
The scorer to use when evaluating swe_bench. If None, uses the default scorer. Mostly commonly, this will be a list of scorers to compare to baselines (see the README for more information).
"""
assert find_spec(
"swebench"
), "To run SWE-bench, please install the optional SWE-bench depency, by running `pip install inspect-ai[swe_bench]`"
samples = hf_dataset(
dataset,
split=split,
sample_fields=FieldSpec(
input="problem_statement",
id="instance_id",
metadata=[
"base_commit",
"patch",
"PASS_TO_PASS",
"FAIL_TO_PASS",
"test_patch",
"version",
"repo",
"environment_setup_commit",
"hints_text",
"created_at",
],
),
# Shuffling helps avoid potential bias in case running a subset of the dataset
shuffle=True,
seed=12345,
)
for sample in samples:
# Turn the saved strings into list objects
sample.metadata = sample.metadata or {}
sample.metadata["PASS_TO_PASS"] = json.loads(sample.metadata["PASS_TO_PASS"])
sample.metadata["FAIL_TO_PASS"] = json.loads(sample.metadata["FAIL_TO_PASS"])
if instance_ids is not None:
samples = samples.filter(lambda x: x.id in instance_ids)
ids_to_docker_image = get_image_names(samples)
swe_agent_bundles = DEFAULT_BUNDLES
for sample in samples:
sample.metadata = sample.metadata or {}
sample.sandbox = SandboxEnvironmentSpec(
type="docker",
config=get_compose_file(ids_to_docker_image[str(sample.id)], swe_agent_bundles),
)
sample.metadata["image"] = {
"name": ids_to_docker_image[str(sample.id)],
# I would like to also have the image digest in the metadata,
# but this is better handled in Inspect, where the image
# is actually pulled.
}
# TODO:
# this currently ignores EVAL_LIMIT and warms up the cache for all images. (We face a similar
# problem with all overrides done at Task instantiation time, e.g. `epochs_override`)
# This is acceptable for now.
# Hack to parallelize image construction (only relevant for non-registry images/layers like
# SWE-Agent), see https://github.com/epoch-research/benchmarks/issues/141 for details.
warm_up_cache(list(samples))
return Task(
dataset=samples,
plan=swe_bench_agent(bundles=swe_agent_bundles),
scorer=scorer or swe_bench_scorer(),
epochs=1,
metadata={"inspect-log-public": True},
)
def get_compose_file(image_name: str, swe_agent_bundles: Optional[List[BundleName]] = None) -> str:
"""
Create a Docker Compose file for the given image.
Args:
image_name (str): The Docker image name to use.
swe_agent_bundles (Optional[List[BundleName]]): List of SWE-Agent bundle names to include.
If None, SWE-Agent is not used.
"""
# Create safe filenames without special characters
base_filename = f"{image_name}".replace("/", "_").replace(":", "_")
COMPOSE_FILES_DIR.mkdir(parents=True, exist_ok=True)
# Create a multistage Dockerfile that clones SWE-Agent and installs necessary tools
dockerfile_name = f"{base_filename}.Dockerfile"
dockerfile_path = COMPOSE_FILES_DIR / dockerfile_name
# If bundles is None, just create a simple Dockerfile with the FROM line
if swe_agent_bundles is None:
dockerfile_content = f"FROM {image_name}"
else:
# other bundles require the "registry" bundle to be included
registry_bundle: BundleName = "registry"
swe_agent_bundles = [registry_bundle] + swe_agent_bundles
dockerfile_content = generate_dockerfile_content(image_name, swe_agent_bundles)
with dockerfile_path.open(mode="w+") as f:
f.write(dockerfile_content)
# Create the compose file that references the Dockerfile
compose_filename = f"{base_filename}.yaml"
image_compose_file = COMPOSE_FILES_DIR / compose_filename
with image_compose_file.open(mode="w+") as f:
f.write(
f"""services:
default:
build:
dockerfile: {dockerfile_name}
command: "sleep infinity"
working_dir: /testbed
network_mode: none
init: true"""
)
return str(image_compose_file)
def get_image_names(samples: Dataset) -> dict[str, str]:
ids_to_docker_image = {}
for instance in samples:
if platform.machine() in ["x86_64", "AMD64"]:
arch = "x86_64"
elif platform.machine() in ["aarch64", "arm64"]:
arch = "arm64"
logger.warning(
"Using arm64 architecture. arm64 support is experimental. Some images may not exist, or may not work."
)
image_name = f"ghcr.io/epoch-research/swe-bench.eval.{arch}.{instance.id}"
ids_to_docker_image[instance.id] = image_name
return ids_to_docker_image
"""
SWE-Bench agent implementation using SWE-Agent tools.
"""
import logging
from typing import List, Dict
from inspect_ai.model import ChatMessageUser
from inspect_ai.solver import solver, TaskState, Generate, Solver
from inspect_ai.tool import Tool, bash, ToolDef
from inspect_ai.util import store
from inspect_ai.tool import tool
from inspect_ai.log._transcript import transcript
from .sweagent_tools import (
sweagent_tooldefs,
setup_sweagent_environment,
SandboxStoreState,
BundleName,
)
logger = logging.getLogger(__name__)
BASH_TOOL_TIMEOUT = 60 * 5
# List of SWE-Agent bundles to include by default
# Note: make sure not to include tools that are redundant with the other tools we
# provide in the solver (e.g. submit_answer, bash).
# Note: There are three edit tool bundles: edit_replace, edit_linting, edit_rewrite.
# Only include one of these at a time.
# Defaults are taken from the SWE-Agent default config:
# https://github.com/SWE-agent/SWE-agent/blob/ce072d61a5d1cd4fc60eb1f281526fdf0bde1640/config/default.yaml
DEFAULT_BUNDLES: List[BundleName] = [
"defaults", # Core file navigation: goto, open, create, scroll_up, scroll_down
"search", # File search tools: find_file, search_dir, search_file
"edit_replace", # Search and replace editing with optional replace-all flag
]
# We skip these bundles:
# - diff_state: Shows differences in state variables between commands
# - edit_anthropic: Anthropic-specific Claude editing tool (str_replace_editor)
# - edit_linting: Line range editing (replaces lines start_line:end_line with new text)
# - edit_rewrite: Complete text replacement for currently visible lines
# - filemap: Python file summarization, skipping lengthy function definitions
# - forfeit: Task-specific for challenges (exit_forfeit command)
# - review_on_submit/review_on_submit_m: Multi-stage submission with review steps
# - submit: Redundant with our submit_answer tool
# The "registry" bundle is required by other bundles and is always included
# in the image build process.
@solver
def swe_bench_agent(
token_limit: int = 1_000_000,
bundles: List[BundleName] | None = None,
window: int = 100,
overlap: int = 2,
) -> Solver:
"""
A solver that uses SWE-Agent tools to solve SWE-Bench tasks.
Args:
token_limit: Maximum number of tokens to use for the conversation.
bundles: List of SWE-Agent bundle names to include.
If None, uses a default set of bundles.
window: Size of the file viewing window (number of lines).
overlap: Number of lines to overlap when scrolling.
Returns:
A solver that uses SWE-Agent tools.
"""
include_bundles = bundles or DEFAULT_BUNDLES
async def solve(state: TaskState, generate: Generate) -> TaskState:
# Initialize submitted answer in store
store().set("submitted_answer", None)
# Set token limit on state, so generate() will respect it
state.token_limit = token_limit
# Load SWE-Agent tools
# All bundles are installed at build time
await setup_sweagent_environment(window=window, overlap=overlap)
swe_agent_tools = await sweagent_tooldefs(
include_bundles=include_bundles,
)
# Flatten tool list for state.tools
all_tools = []
for bundle_tools in swe_agent_tools.values():
# Convert ToolDefs to Tool instances
as_tools = [tool_def.as_tool() for tool_def in bundle_tools]
all_tools.extend(as_tools)
# Always include the bash tool and submit_answer tool
state.tools = [bash(timeout=BASH_TOOL_TIMEOUT), submit_answer()] + all_tools
state.tool_choice = "auto"
# Display initial prompt with instructions
state.messages = [
ChatMessageUser(
content=initial_prompt(
question=state.user_prompt.text,
token_limit=token_limit,
repo=state.metadata["repo"],
swe_agent_tools=swe_agent_tools,
window_size=window,
overlap_size=overlap,
),
),
]
while state.token_usage < token_limit:
state = await generate(state, tool_calls="loop")
# If an answer was submitted, we're done
if store().get("submitted_answer") is not None:
break
# Otherwise, continue the conversation
message = (
f"You have used {state.token_usage:,} tokens (hard limit: {token_limit:,} tokens). "
f"Continue working on the problem. Once you've made the necessary changes to the "
f"repository, submit your answer using the `submit_answer` tool."
)
state.messages.append(ChatMessageUser(content=message))
return state
return solve
@tool
def submit_answer() -> Tool:
async def execute() -> str:
"""
Call this tool to signal that you have finished making changes to the repository to solve
the task. The tool does not take any arguments.
"""
store().set("submitted_answer", True)
return "Your answer has been recorded. No feedback is provided."
return execute
def initial_prompt(
question: str,
token_limit: int,
repo: str,
swe_agent_tools: Dict[BundleName, List[ToolDef]],
window_size: int,
overlap_size: int,
) -> str:
"""Generate the initial prompt with dynamic tool descriptions based on the actual loaded tools."""
# Create tool descriptions section using bundles directly
tool_descriptions = ""
for bundle, tools in swe_agent_tools.items():
tool_names_str = ", ".join(f"`{tool.name}`" for tool in sorted(tools, key=lambda t: t.name))
tool_descriptions += f"\n - {bundle}: {tool_names_str}"
return TEMPLATE.format(
question=question,
token_limit=token_limit,
repo=repo,
bash_tool_timeout=BASH_TOOL_TIMEOUT,
tool_descriptions=tool_descriptions,
window_size=window_size,
overlap_size=overlap_size,
)
TEMPLATE = r"""
You will be solving an open issue from the {repo} repository. You have access to a Python environment
containing the repository, and you will need to modify the code to solve the issue.
When you are done, call the `submit_answer` tool to let us know.
* Environment:
- You have access to a Python environment containing the repository. You interact with the
environment by calling various tools.
- The environment does not have internet access.
* Token limits:
- There is a hard limit of {token_limit:,} tokens. If you exceed this, the conversation will end
immediately (even if you haven't called the submit tool). The final state of the repository at
that point will be used as your answer.
- Both input and output tokens count towards the limit.
* SWE-Agent Tools:
- Available tools by category: {tool_descriptions}
- Additional guidance on the file viewing and editing tools:
- The file viewing and editing tools let you view {window_size} lines at a time with an overlap of {overlap_size} lines.
- You can use the `scroll_up` and `scroll_down` commands to navigate through larger files.
- Avoid using the scrolling commands multiple times. Instead, use the `goto` command to jump to a specific line, or
open the file at a specific line.
* You also have access to the `bash` tool:
- You can use the tool to execute arbitrary shell commands in the repository environment.
- The tool has a timeout of {bash_tool_timeout} seconds.
* The `submit_answer` tool takes no arguments and simply records that you have finished making changes to the repository.
Here is the issue you need to solve:
<issue>
{question}
</issue>
"""
import json
import logging
import os
import re
import shlex
from collections import Counter
from pathlib import Path
from textwrap import dedent
import jsonlines
from inspect_ai.log import EvalLog
from inspect_ai.scorer import Score, Scorer, Target, mean, scorer, std, INCORRECT, CORRECT
from inspect_ai.solver import TaskState
from inspect_ai.util import ExecResult, sandbox
from inspect_ai.scorer import accuracy, stderr
logger = logging.getLogger(__name__)
# 30 minutes is hopefully very conservative, but I haven't had time for a thorough investigation
# of how long the tests should legitimately take to run, and I really don't want to unduly mark
# as INCORRECT because of timeouts.
EVAL_SCRIPT_TIMEOUT = 60 * 30
@scorer(metrics=[accuracy(), stderr()])
def swe_bench_scorer() -> Scorer:
"""Scores the changes made by a solver when solving a swe-bench instance, by running the tests which check whether than instance is correct."""
async def scorer(state: TaskState, target: Target) -> Score:
# Get the changes the model made, for logging purposes
await sandbox().exec(
[
"bash",
"-c",
CREATE_MODEL_PATCH.format(base_commit=state.metadata["base_commit"]),
]
)
try:
agent_patch = await sandbox().exec(["bash", "-c", GET_AGENT_PATCH])
except UnicodeDecodeError:
agent_patch = ExecResult(
True,
0,
"Agent patch could not be decoded due to having a binary input.",
"",
)
# Run the evaluation script
eval_script = get_eval_script(
test_patch=state.metadata["test_patch"],
repo=state.metadata["repo"],
version=state.metadata["version"],
base_commit=state.metadata["base_commit"],
)
logger.debug(f"Running eval script")
try:
eval_script_result = await sandbox().exec(
["bash", "-c", eval_script], timeout=EVAL_SCRIPT_TIMEOUT
)
except TimeoutError as e:
explanation = f"The evaluation script timed out after {EVAL_SCRIPT_TIMEOUT} seconds."
return Score(
value=INCORRECT,
explanation=explanation,
metadata={
"model_patch": agent_patch.stdout,
"eval_script": eval_script,
"eval_script_timeout": str(e),
},
)
if not eval_script_result.success:
raise RuntimeError(
f"Test run failed. \n\nStderr: \n\n{eval_script_result.stderr}\n\nStdout: \n\n{eval_script_result.stdout}"
)
value, explanation, pass_to_pass_results, fail_to_pass_results = parse_test_output(
eval_script_result.stdout + "\n" + eval_script_result.stderr, state
)
return Score(
value=value,
explanation=explanation,
metadata={
"model_patch": agent_patch.stdout,
"eval_script": eval_script,
"test_results": {
"pass_to_pass": pass_to_pass_results,
"fail_to_pass": fail_to_pass_results,
},
"eval_script_result": eval_script_result,
},
)
return scorer
def parse_test_output(test_output: str, state: TaskState) -> tuple[str, str, dict, dict]:
# This function looks at the output of running the tests, and returns a score and an explanation of the results
# Import these here as swebench is an optional dependency
from swebench.harness.constants import ( # type: ignore
APPLY_PATCH_FAIL,
RESET_FAILED,
TESTS_ERROR,
TESTS_TIMEOUT,
)
from swebench.harness.grading import MAP_REPO_TO_PARSER # type: ignore
# Search for the error strings defined by the swe-bench authors
error_string_search = {
x: x in test_output
for x in [
APPLY_PATCH_FAIL,
RESET_FAILED,
TESTS_ERROR,
TESTS_TIMEOUT,
"Failed to reset task environment",
]
}
if any(error_string_search.values()):
empty_results: tuple[dict, dict] = {}, {}
return (
INCORRECT,
f"The tests did not run correctly. Output from searching for error strings:\n\n{error_string_search}\n\nOutput from tests:\n\n{test_output}",
*empty_results,
)
test_output_parser = MAP_REPO_TO_PARSER[state.metadata["repo"]]
test_output_parsed = test_output_parser(test_output)
pass_to_pass_results = {k: "FAILED" for k in state.metadata["PASS_TO_PASS"]}
fail_to_pass_results = {k: "FAILED" for k in state.metadata["FAIL_TO_PASS"]}
for k, v in test_output_parsed.items():
if k in state.metadata["PASS_TO_PASS"]:
pass_to_pass_results[k] = v
elif k in state.metadata["FAIL_TO_PASS"]:
fail_to_pass_results[k] = v
passed_all_tests = all(["PASSED" == v for v in pass_to_pass_results.values()]) and all(
["PASSED" == v for v in fail_to_pass_results.values()]
)
value = CORRECT if passed_all_tests else INCORRECT
# Sort both so the the false values are at the top
pass_to_pass_results, fail_to_pass_results = (
dict(sorted(pass_to_pass_results.items(), key=lambda x: x[1] == "PASSED")),
dict(sorted(fail_to_pass_results.items(), key=lambda x: x[1] == "PASSED")),
)
p2p_counts = Counter(pass_to_pass_results.values())
f2p_counts = Counter(fail_to_pass_results.values())
# Create markdown lists for the counts
p2p_counts_str = "\n".join([f"* {k}: {v}" for k, v in p2p_counts.items()])
f2p_counts_str = "\n".join([f"* {k}: {v}" for k, v in f2p_counts.items()])
# Create an explanation of the results
explanation_lines = []
explanation_lines.append(f"Pass-to-pass tests:\n{p2p_counts_str}")
explanation_lines.append("\n")
explanation_lines.append(f"Fail-to-pass tests:\n{f2p_counts_str}")
explanation = "\n".join(explanation_lines)
return value, explanation, pass_to_pass_results, fail_to_pass_results
def swe_bench_baseline_scorer(path_to_baseline: str, name: str | None = None) -> Scorer:
"""Given a path to a set of SWE-bench trajectories in the official format (see https://github.com/swe-bench/experiments), returns the performance of those trajectories on the subset of SWE-bench you are evaluating on. This lets you compare to baselines on arbitrary subsets of SWE-bench."""
baseline_name = name if name else Path(path_to_baseline).name
results_per_instance_id = get_baseline_results(path_to_baseline)
@scorer(metrics=[mean(), std()], name=baseline_name)
def _swebench_baseline_scorer() -> Scorer:
async def scorer(state: TaskState, target: Target) -> Score:
if state.sample_id in results_per_instance_id:
results = results_per_instance_id[str(state.sample_id)]
return Score(
value=results["resolved"],
explanation=f"Model Patch:\n\n {results['patch']}",
)
else:
return Score(value="N", explanation="No baseline found for this instance")
return scorer
return _swebench_baseline_scorer()
CREATE_MODEL_PATCH = """cd /testbed
git add -A
git diff --cached {base_commit} > model.patch"""
GET_AGENT_PATCH = """cd /testbed/
cat model.patch"""
def get_eval_script(test_patch: str, repo: str, version: str, base_commit: str) -> str:
"""Creates a script which runs the tests of all the files in the test_patch."""
# First we fetch the repository-specific 'specification' which SWE-bench provides
from swebench.harness.constants import MAP_REPO_VERSION_TO_SPECS
from swebench.harness.utils import get_test_directives # type: ignore
# Fetch the command which runs the test. Often simply the string 'pytest'
test_command = MAP_REPO_VERSION_TO_SPECS[repo][version]["test_cmd"]
# Find all the files which have been modified by the test patch
test_patch_files = re.findall(r"--- a/(.*)", test_patch)
# Find all the files which contain tests. Ugly interface is due to swebench
test_files = get_test_directives({"repo": repo, "test_patch": test_patch})
# Reset test files to the state they should be in before the patch.
eval_script = f"""#!/bin/bash
set -uo pipefail -x
source ~/.bashrc
# First we reset all of the files which out test patch touches
git checkout {base_commit} {' '.join(test_patch_files)}
#Then we apply the test patch given to us by SWE-bench, setting up the test we need to run
echo {shlex.quote(test_patch)} > /tmp/test_patch.diff
git apply --check /tmp/test_patch.diff
git apply /tmp/test_patch.diff
#Then we run all the tests in the repository.
set +x
{test_command} {" ".join(test_files)} || true
"""
return eval_script
def get_baseline_results(path_to_baseline: str) -> dict[str, dict[str, str]]:
"""Loads the results of a SWE-bench baseline in the offical format, and returns a dictionary of the results."""
path_to_logs = os.path.join(path_to_baseline, "logs")
results_per_instance_id = {}
for result in os.listdir(path_to_logs):
results_path = os.path.join(path_to_logs, result, "report.json")
patch_path = os.path.join(path_to_logs, result, "patch.diff")
if os.path.exists(results_path) and os.path.exists(patch_path):
# Sometimes there is no result saved, at which point we ignore that entry
with open(results_path, "r") as f:
result_dict = json.load(f)
instance_id, raw_results = next(iter(result_dict.items()))
with open(patch_path, "r") as f:
results_per_instance_id[instance_id] = {
"resolved": raw_results["resolved"],
"patch": f.read(),
"tests_status": (
raw_results["tests_status"]
if "tests_status" in raw_results
else "NO STATUS REPORTED"
),
}
return results_per_instance_id
def save_outputs_to_swebench_format(
eval_logs: list[EvalLog] | EvalLog,
output_dir: str | Path,
print_instance_ids: bool = True,
) -> None:
"""Converts the outputs of the swe_bench_scorer into the format expected by the swe_bench_baseline_scorer"""
output_dir = Path(output_dir)
eval_logs = eval_logs if isinstance(eval_logs, list) else [eval_logs]
os.makedirs(output_dir, exist_ok=True)
for log in eval_logs:
log_name = f"{log.eval.created}+{log.eval.task}_{log.eval.run_id}"
log_name = (
log_name.replace("_", "-").replace("/", "-").replace(":", "-")
) # Mirrors the name convention of the inspect log files
preds = []
if log.samples is None:
raise ValueError(f"The eval log {log_name} does not contain any samples.")
for sample in log.samples:
preds += [
{
"model_name_or_path": log_name,
"model_patch": sample.scores["swe_bench_scorer"].metadata[ # type: ignore
"model_patch"
],
"instance_id": sample.id,
}
]
output_file = output_dir / f"{log_name}.jsonl"
jsonlines.open(output_file, "w").write_all(preds)
print(
f"""Log saved. Run evaluation with:
python -m swebench.harness.run_evaluation \\
--predictions_path {output_file} \\
--dataset princeton-nlp/SWE-bench_Verified \\
--max_workers 8 \\
--run_id check-outputs\\
--instance_ids {' '.join([str(sample.id) for sample in log.samples]) if print_instance_ids else "INSTANCE_IDS"}"""
)
print(f"Saved the outputs of the scorers to {output_dir}")
"""
Integration of SWE-Agent tools with Inspect's ToolDef framework.
This module provides a bridge between SWE-Agent tools and Inspect by:
1. Loading SWE-Agent tool bundles
2. Adapting Inspect's SandboxEnvironment to SWE-Agent's SWEEnv interface
3. Converting SWE-Agent Command objects to Inspect ToolDef objects
"""
import asyncio
import json
import os
import re
import shlex
import sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Literal
from inspect_ai.tool import ToolDef, ToolParams, ToolParam
from inspect_ai.util import sandbox, store_as
from inspect_ai.util._store_model import StoreModel
from pydantic import Field
import logging
from bench import PROJECT_ROOT
from environs import Env
env = Env()
env.read_env()
logger = logging.getLogger(__name__)
SWE_AGENT_PATH = Path(env.str("SWE_AGENT_PATH"))
# TODO: I couldn't get `mypy` to run on the SWE-Agent imports
from sweagent.environment.swe_env import SWEEnv # type: ignore
from sweagent.tools.bundle import Bundle # type: ignore
from sweagent.tools.commands import Command # type: ignore
# Define Literal type for available bundles
BundleName = Literal[
"defaults", # Core file navigation
"search", # File search tools
"edit_replace", # Search and replace editing
"edit_linting", # Line range editing
"edit_rewrite", # Complete text replacement
"edit_anthropic", # Anthropic-specific editing
"diff_state", # State variable diff
"filemap", # Python file summarization
"forfeit", # Task-specific for challenges
"registry", # Environment variables infrastructure
"review_on_submit", # Multi-stage submission
"review_on_submit_m", # Multi-stage submission
"submit", # Submission tool
]
class SandboxStoreState(StoreModel):
"""State for a sandbox environment used by SWE-Agent tools."""
current_file: Optional[str] = Field(default=None)
first_line: int = Field(default=0)
class InspectSandboxEnvAdapter(SWEEnv):
"""Adapts Inspect's SandboxEnvironment to SWE-Agent's SWEEnv interface."""
def __init__(self, sandbox_name=None):
self._sandbox = sandbox(sandbox_name)
self._env_vars = {}
def communicate(self, command, timeout=None, check=None):
"""Execute a command in the sandbox and return its output."""
result = asyncio.run(
self._sandbox.exec(cmd=["bash", "--login", "-c", command], timeout=timeout)
)
if check == "raise" and not result.success:
raise RuntimeError(f"Command failed: {command}")
return result.stdout
def set_env_variables(self, env_vars):
"""Set environment variables for the sandbox."""
self._env_vars.update(env_vars)
def read_file(self, path):
"""Read a file from the sandbox."""
return asyncio.run(self._sandbox.read_file(path))
def write_file(self, path, content):
"""Write a file to the sandbox."""
return asyncio.run(self._sandbox.write_file(path, content))
def command_to_tool_def(command: Command, bundle_path: Path) -> ToolDef:
"""Convert a SWE-Agent Command to an Inspect ToolDef.
Args:
command: The SWE-Agent Command to convert.
bundle_path: The path to the bundle containing the command.
"""
# Convert SWE-Agent arguments to Inspect ToolParams
params = ToolParams()
for arg in command.arguments:
param_type = arg.type
# Convert SWE-Agent types to Inspect types
if param_type == "integer":
param_type = "number"
# Keep string, boolean, array, object as-is
# Ensure both type and description are valid to pass validation
params.properties[arg.name] = ToolParam(
type=param_type or "string", # Default to string if type is None
description=arg.description
or f"Parameter {arg.name} for {command.name}", # Ensure description is not None
)
# Add to required list if the argument is required
if arg.required:
if params.required is None:
params.required = []
params.required.append(arg.name)
async def execute_tool(**kwargs: Any) -> str:
"""Execute the SWE-Agent tool in the sandbox environment."""
logger.debug(f"Tool called: {command.name}")
logger.debug(f"Arguments received: {kwargs}")
# Get state for this sandbox
state = store_as(SandboxStoreState)
# Create arguments list in the correct order
args_list = []
for arg in command.arguments:
if arg.name in kwargs:
arg_value = kwargs[arg.name]
if isinstance(arg_value, bool):
if arg_value:
args_list.append(str(arg_value).lower())
else:
args_list.append(str(arg_value))
quoted_args = [shlex.quote(str(arg)) for arg in args_list]
cmd_str = f"{command.name} {' '.join(quoted_args)}"
logger.debug(f"Executing command: {cmd_str}")
# Set up tool environment paths
setup_cmd = (
f"export PATH=$PATH:/root/tools/{bundle_path.name}/bin:/root/tools/defaults/bin:/root/tools/registry/bin && "
f"export PYTHONPATH=$PYTHONPATH:/root/tools/{bundle_path.name}/lib:/root/tools/defaults/lib:/root/tools/registry/lib"
)
# Execute the command
result = await sandbox().exec(
cmd=["bash", "--login", "-c", f"{setup_cmd} && {cmd_str}"],
timeout=120,
)
logger.debug(f"Command result: {result}")
# Read the registry to update our state
registry_content = await sandbox().read_file("/root/.swe-agent-env")
updated_registry = json.loads(registry_content)
# Update state with changes made by the tool
if "CURRENT_FILE" in updated_registry:
state.current_file = updated_registry["CURRENT_FILE"]
if "FIRST_LINE" in updated_registry:
state.first_line = int(updated_registry["FIRST_LINE"])
return result.stdout
# Create the ToolDef
tool_name = command.name
tool_description = command.docstring or f"Run the {command.name} command"
return ToolDef(
tool=execute_tool, name=tool_name, description=tool_description, parameters=params
)
async def setup_sweagent_environment(window: int = 100, overlap: int = 2) -> None:
"""Set up the SWE-Agent environment in the sandbox.
Args:
window: Size of the file viewing window (number of lines).
overlap: Number of lines to overlap when scrolling.
"""
# Initialize registry file with default values
registry_content = {
"WINDOW": str(window),
"OVERLAP": str(overlap),
}
await sandbox().write_file("/root/.swe-agent-env", json.dumps(registry_content))
def raise_if_bundle_dir_invalid(bundle_dir: Path, bundle_name: str) -> None:
if not bundle_dir.is_dir():
raise ValueError(f"Path {bundle_dir} for bundle {bundle_name} is not a directory")
if not (bundle_dir / "config.yaml").exists():
raise ValueError(
f"Directory {bundle_dir} for bundle {bundle_name} does not contain a config.yaml file"
)
def generate_dockerfile_content(image_name: str, bundles: List[BundleName]) -> str:
"""Generate Dockerfile content for SWE-Agent environment.
Args:
image_name: The base Docker image name to use.
bundles: List of SWE-Agent bundles to include.
Returns:
String containing the Dockerfile content.
"""
# Keep SWE_AGENT_BRANCH in sync with the version in benchmarks/Dockerfile
SWE_AGENT_URL = "https://github.com/SWE-agent/SWE-agent.git"
SWE_AGENT_BRANCH = "v1.0.1"
dockerfile_content = f"""# Stage 1: Clone SWE-Agent repository
FROM alpine/git as swe-agent-source
RUN git clone --depth 1 --branch {SWE_AGENT_BRANCH} {SWE_AGENT_URL} /swe-agent
# Stage 2: Build final image
FROM {image_name}
# Install packages required by the SWE-Agent tools
RUN --mount=type=cache,target=/root/.cache/pip \
pip install flake8
# Create directory structure for SWE-Agent tools
RUN mkdir -p /root/tools
"""
# Copy each specified bundle
for bundle in bundles:
dockerfile_content += f"""
# Copy {bundle} bundle
COPY --from=swe-agent-source /swe-agent/tools/{bundle} /root/tools/{bundle}
"""
# Setup registry environment
dockerfile_content += """
# Initialize registry file
RUN echo "{}" > /root/.swe-agent-env
# Make executable files in bin directories executable
RUN find /root/tools -path "*/bin/*" -type f -exec chmod +x {} \\;
"""
return dockerfile_content
async def sweagent_tooldefs(
include_bundles: List[BundleName],
tools_dir=None,
) -> Dict[BundleName, List[ToolDef]]:
"""Convert SWE-Agent tools to Inspect ToolDefs.
Args:
include_bundles: List of bundle names to include as ToolDefs.
window: Size of the file viewing window (number of lines).
overlap: Number of lines to overlap when scrolling.
tools_dir: Directory containing SWE-Agent tools bundles.
Defaults to SWE-agent/tools.
Returns:
Dictionary mapping bundle names to lists of ToolDef objects for that bundle.
"""
if tools_dir is None:
tools_dir = SWE_AGENT_PATH / "tools"
swe_agent_tools: Dict[BundleName, List[ToolDef]] = {}
tool_names = set()
# Load SWE-Agent bundles for ToolDefs - all bundles are installed in the Docker image at build time
for bundle_name in include_bundles:
bundle_dir = tools_dir / bundle_name
raise_if_bundle_dir_invalid(bundle_dir, bundle_name)
bundle = Bundle(path=bundle_dir)
# Initialize list for this bundle
swe_agent_tools[bundle_name] = []
# Create tool definitions
for command in bundle.commands:
tool_def = command_to_tool_def(command, bundle_dir)
# Check for duplicate tool names
if tool_def.name in tool_names:
raise ValueError(
f"Duplicate tool name detected: {tool_def.name}. Remove one of the bundles that provides this tool."
)
tool_names.add(tool_def.name)
swe_agent_tools[bundle_name].append(tool_def)
logger.debug(
f"Loaded tools from {len(swe_agent_tools)} bundles: {list(swe_agent_tools.keys())}"
)
return swe_agent_tools
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment