Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions ais_bench/benchmark/tasks/swebench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ def make_swebench_session_id() -> str:
return uuid.uuid4().hex


def _merge_docker_labels(labels, session_id: str) -> dict:
def _merge_docker_labels(labels, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL) -> dict:
"""Merge session label into Docker labels dict.

Docker SDK ``containers.create/run(labels=...)`` expects a mapping
(label key -> value). Always returns a dict.

``label_key`` defaults to the SWE-bench session label so existing callers
are unaffected; other datasets (e.g. SWE-bench Pro) may pass their own.
"""
if isinstance(labels, dict):
merged = dict(labels)
Expand All @@ -41,26 +44,29 @@ def _merge_docker_labels(labels, session_id: str) -> dict:
merged[k] = v
else:
merged = {}
merged[SWEBENCH_SESSION_LABEL] = session_id
merged[label_key] = session_id
return merged


class _DockerContainersWithSessionLabel:
def __init__(self, containers, session_id: str):
def __init__(self, containers, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL):
self._containers = containers
self._session_id = session_id
self._label_key = label_key

def create(self, *args, **kwargs):
kwargs["labels"] = _merge_docker_labels(
kwargs.get("labels"),
self._session_id,
self._label_key,
)
return self._containers.create(*args, **kwargs)

def run(self, *args, **kwargs):
kwargs["labels"] = _merge_docker_labels(
kwargs.get("labels"),
self._session_id,
self._label_key,
)
return self._containers.run(*args, **kwargs)

Expand All @@ -69,23 +75,24 @@ def __getattr__(self, name):


class _DockerClientWithSessionLabel:
def __init__(self, client, session_id: str):
def __init__(self, client, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL):
self._client = client
self.containers = _DockerContainersWithSessionLabel(
client.containers,
session_id,
label_key,
)

def __getattr__(self, name):
return getattr(self._client, name)


def add_swebench_session_label_to_docker_client(client, session_id: str):
def add_swebench_session_label_to_docker_client(client, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL):
"""Return a Docker client wrapper that labels containers it creates."""
return _DockerClientWithSessionLabel(client, session_id)
return _DockerClientWithSessionLabel(client, session_id, label_key)


def list_swebench_container_ids(session_id: Optional[str] = None) -> Set[str]:
def list_swebench_container_ids(session_id: Optional[str] = None, label_key: str = SWEBENCH_SESSION_LABEL) -> Set[str]:
"""Return Docker container IDs tagged for one SWE-bench task session."""
if not session_id:
return set()
Expand All @@ -98,7 +105,7 @@ def list_swebench_container_ids(session_id: Optional[str] = None) -> Set[str]:
"ps",
"-aq",
"--filter",
f"label={SWEBENCH_SESSION_LABEL}={session_id}",
f"label={label_key}={session_id}",
],
capture_output=True,
text=True,
Expand All @@ -121,10 +128,11 @@ def cleanup_swebench_containers(
*,
container_ids: Optional[Iterable[str]] = None,
session_id: Optional[str] = None,
label_key: str = SWEBENCH_SESSION_LABEL,
):
"""Stop and remove containers created by the current SWE-bench task."""
targets = set(container_ids or [])
targets.update(list_swebench_container_ids(session_id))
targets.update(list_swebench_container_ids(session_id, label_key))
targets = sorted(targets)
if not targets:
return
Expand All @@ -142,11 +150,11 @@ def cleanup_swebench_containers(
_logger.warning("Unexpected error removing containers", exc_info=True)


def add_swebench_session_label_to_run_args(config: dict, session_id: str) -> None:
def add_swebench_session_label_to_run_args(config: dict, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL) -> None:
"""Add this task's Docker label to mini-swe-agent Docker run args."""
environment = config.setdefault("environment", {})
run_args = list(environment.get("run_args", ["--rm"]))
label_flag = f"{SWEBENCH_SESSION_LABEL}={session_id}"
label_flag = f"{label_key}={session_id}"
if label_flag not in run_args:
run_args.extend(["--label", label_flag])
environment["run_args"] = run_args
Expand Down
26 changes: 21 additions & 5 deletions ais_bench/benchmark/tasks/swebench_pro/swebench_pro_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
FileOperationError,
)
from ais_bench.benchmark.tasks.swebench_pro.utils import (
add_swebench_pro_session_label_to_docker_client,
clean_swebench_pro_images,
cleanup_swebench_pro_containers,
ensure_swebench_pro_docker_images,
get_dockerhub_image_uri,
eval_with_docker,
get_dockerhub_image_uri,
list_swebench_pro_images,
clean_swebench_pro_images,
make_swebench_pro_session_id,
)

KEY_INSTANCE_ID = "instance_id"
Expand Down Expand Up @@ -361,7 +364,14 @@ def run(self, task_state_manager: TaskStateManager):
SWEBP_CODES.SWEBENCH_HARNESS_IMPORT_ERROR,
"docker SDK is not installed. Install via 'pip install docker'"
) from e
docker_client = docker.from_env()
session_id = make_swebench_pro_session_id()
self.logger.info("SWE-bench Pro eval session_id: %s", session_id)
# Wrap the Docker client so every eval container it creates is tagged
# with this task's session label. Cleanup later filters by that label,
# so concurrent SWE-bench Pro tasks never remove each other's containers.
docker_client = add_swebench_pro_session_label_to_docker_client(
docker.from_env(), session_id
)
prior_images = list_swebench_pro_images(docker_client)

ensure_swebench_pro_docker_images(
Expand Down Expand Up @@ -482,9 +492,15 @@ def run_eval_with_progress(patch, instance, report_dir, scripts_dir_abs, docker_
finally:
if pbar is not None:
pbar.close()

# Only remove containers tagged with this task's session label, so
# concurrently running SWE-bench Pro tasks are left untouched.
self.logger.info(
"Cleaning up eval containers for session %s...", session_id
)
cleanup_swebench_pro_containers(session_id=session_id)

self.logger.info("All instances run.")

self.logger.info("Cleaning up SWE-bench Pro images...")
clean_swebench_pro_images(docker_client, prior_images, self.logger)
self.logger.info("Image cleanup completed.")
Expand Down
15 changes: 13 additions & 2 deletions ais_bench/benchmark/tasks/swebench_pro/swebench_pro_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
AISBenchValueError,
)
from ais_bench.benchmark.tasks.swebench_pro.utils import (
add_swebench_pro_session_label_to_run_args,
cleanup_swebench_pro_containers,
ensure_swebench_pro_docker_images,
get_dockerhub_image_uri,
make_swebench_pro_session_id,
merge_nested_dicts,
build_problem_statement,
)
Expand Down Expand Up @@ -270,6 +272,12 @@ def run(self, task_state_manager: TaskStateManager):
base_config = merge_nested_dicts(default_swebench_config, our_config)
if dataset_cfg.get("step_limit") is not None:
base_config.setdefault("agent", {})["step_limit"] = dataset_cfg["step_limit"]
session_id = make_swebench_pro_session_id()
# Tag every mini-swe-agent container with this task's session label via
# ``--label`` in Docker run_args. Cleanup later filters by that label so
# concurrently running SWE-bench Pro tasks never remove each other's containers.
add_swebench_pro_session_label_to_run_args(base_config, session_id)
self.logger.info("SWE-bench Pro infer session_id: %s", session_id)
self.logger.info(f"base_config '{base_config}'")

progress_manager, live_render_group = _make_swebench_pro_progress_manager(
Expand Down Expand Up @@ -357,13 +365,16 @@ def run_executor():
for future in futures:
if not future.running() and not future.done():
future.cancel()
cleanup_swebench_pro_containers()
# Best-effort cleanup of this task's mini-swe-agent containers.
cleanup_swebench_pro_containers(session_id=session_id)
executor.shutdown(wait=False)
raise
finally:
if not interrupted[0]:
executor.shutdown(wait=True)
cleanup_swebench_pro_containers()
# After all work is done (normal or interrupted), attempt one more
# cleanup of only the containers owned by this task's session.
cleanup_swebench_pro_containers(session_id=session_id)

if live_render_group is not None:
from rich.live import Live
Expand Down
72 changes: 49 additions & 23 deletions ais_bench/benchmark/tasks/swebench_pro/utils.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,62 @@
import os
import subprocess
import re
from typing import Callable, Iterable, TypeVar
from typing import Callable, Iterable, Optional, Set, TypeVar
import json

from ais_bench.benchmark.utils.logging import AISLogger
from ais_bench.benchmark.utils.logging.error_codes import SWEBP_CODES
from ais_bench.benchmark.utils.logging.exceptions import AISBenchRuntimeError, AISBenchImportError


def cleanup_swebench_pro_containers():
name_filters = ["minisweagent-", "sweb.eval"]
for name_filter in name_filters:
try:
r = subprocess.run(
["docker", "ps", "-aq", "--filter", f"name={name_filter}"],
capture_output=True,
text=True,
timeout=10,
)
if r.returncode != 0 or not (r.stdout or "").strip():
continue
ids = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()]
if not ids:
continue
subprocess.run(
["docker", "rm", "-f"] + ids,
capture_output=True,
timeout=30,
)
except (FileNotFoundError, subprocess.TimeoutExpired, Exception):
pass
from ais_bench.benchmark.tasks.swebench.utils import (
add_swebench_session_label_to_docker_client as _add_session_label_to_docker_client,
add_swebench_session_label_to_run_args as _add_session_label_to_run_args,
cleanup_swebench_containers as _cleanup_session_containers,
list_swebench_container_ids as _list_session_container_ids,
make_swebench_session_id as _make_session_id,
)


SWEBENCH_PRO_SESSION_LABEL = "ais_bench.swebench_pro.session"


def make_swebench_pro_session_id() -> str:
"""Generate a unique session id for one SWE-bench Pro task run."""
return _make_session_id()


def add_swebench_pro_session_label_to_docker_client(client, session_id: str):
"""Return a Docker client wrapper that labels containers it creates."""
return _add_session_label_to_docker_client(client, session_id, SWEBENCH_PRO_SESSION_LABEL)


def list_swebench_pro_container_ids(session_id: Optional[str] = None) -> Set[str]:
"""Return Docker container IDs tagged for one SWE-bench Pro task session."""
return _list_session_container_ids(session_id, SWEBENCH_PRO_SESSION_LABEL)


def cleanup_swebench_pro_containers(
*,
container_ids: Optional[Iterable[str]] = None,
session_id: Optional[str] = None,
):
"""Stop and remove containers created by the current SWE-bench Pro task.

Only containers tagged with this task's session label (or explicitly passed
via ``container_ids``) are removed, so concurrently running SWE-bench Pro
tasks never clean up each other's containers.
"""
_cleanup_session_containers(
container_ids=container_ids,
session_id=session_id,
label_key=SWEBENCH_PRO_SESSION_LABEL,
)


def add_swebench_pro_session_label_to_run_args(config: dict, session_id: str) -> None:
"""Add this task's Docker label to mini-swe-agent Docker run args."""
_add_session_label_to_run_args(config, session_id, SWEBENCH_PRO_SESSION_LABEL)


def list_swebench_pro_images(client) -> set[str]:
Expand Down
Loading
Loading