From 8da19800776548df20d963b97fc35ba6c3f15847 Mon Sep 17 00:00:00 2001 From: ivanbao9783 Date: Tue, 30 Jun 2026 16:32:48 +0800 Subject: [PATCH] =?UTF-8?q?refactor(datesets):=201.=20=E5=BD=92=E4=B8=80sw?= =?UTF-8?q?e=E5=92=8Cswebp=E4=B8=A4=E4=B8=AA=E6=95=B0=E6=8D=AE=E9=9B=86?= =?UTF-8?q?=E7=9A=84=E9=95=9C=E5=83=8F=E6=93=8D=E4=BD=9C=EF=BC=8C=E6=B6=88?= =?UTF-8?q?=E9=99=A4=E5=86=97=E4=BD=99=E4=BB=A3=E7=A0=81=E3=80=82=202.=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96swebp=E4=B8=B4=E6=97=B6=E5=AE=B9=E5=99=A8?= =?UTF-8?q?=E6=B8=85=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BB=A5Session?= =?UTF-8?q?=E7=B2=92=E5=BA=A6=E7=AE=A1=E7=90=86=E7=94=9F=E5=91=BD=E5=91=A8?= =?UTF-8?q?=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ais_bench/benchmark/tasks/swebench/utils.py | 30 +- .../tasks/swebench_pro/swebench_pro_eval.py | 26 +- .../tasks/swebench_pro/swebench_pro_infer.py | 15 +- .../benchmark/tasks/swebench_pro/utils.py | 72 +++-- .../swebp_pro/test_session_concurrency.py | 299 ++++++++++++++++++ .../tasks/swebp_pro/test_swebench_pro_eval.py | 3 + tests/UT/tasks/swebp_pro/test_utils.py | 165 ++++++++-- 7 files changed, 536 insertions(+), 74 deletions(-) create mode 100644 tests/UT/tasks/swebp_pro/test_session_concurrency.py diff --git a/ais_bench/benchmark/tasks/swebench/utils.py b/ais_bench/benchmark/tasks/swebench/utils.py index 2fee8a6d..3f7af5e1 100644 --- a/ais_bench/benchmark/tasks/swebench/utils.py +++ b/ais_bench/benchmark/tasks/swebench/utils.py @@ -24,11 +24,14 @@ def make_swebench_session_id() -> str: return uuid.uuid4().hex -def _merge_docker_labels(labels, session_id: str) -> dict: +def _merge_docker_labels(labels, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL) -> dict: """Merge session label into Docker labels dict. Docker SDK ``containers.create/run(labels=...)`` expects a mapping (label key -> value). Always returns a dict. + + ``label_key`` defaults to the SWE-bench session label so existing callers + are unaffected; other datasets (e.g. SWE-bench Pro) may pass their own. """ if isinstance(labels, dict): merged = dict(labels) @@ -41,19 +44,21 @@ def _merge_docker_labels(labels, session_id: str) -> dict: merged[k] = v else: merged = {} - merged[SWEBENCH_SESSION_LABEL] = session_id + merged[label_key] = session_id return merged class _DockerContainersWithSessionLabel: - def __init__(self, containers, session_id: str): + def __init__(self, containers, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL): self._containers = containers self._session_id = session_id + self._label_key = label_key def create(self, *args, **kwargs): kwargs["labels"] = _merge_docker_labels( kwargs.get("labels"), self._session_id, + self._label_key, ) return self._containers.create(*args, **kwargs) @@ -61,6 +66,7 @@ def run(self, *args, **kwargs): kwargs["labels"] = _merge_docker_labels( kwargs.get("labels"), self._session_id, + self._label_key, ) return self._containers.run(*args, **kwargs) @@ -69,23 +75,24 @@ def __getattr__(self, name): class _DockerClientWithSessionLabel: - def __init__(self, client, session_id: str): + def __init__(self, client, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL): self._client = client self.containers = _DockerContainersWithSessionLabel( client.containers, session_id, + label_key, ) def __getattr__(self, name): return getattr(self._client, name) -def add_swebench_session_label_to_docker_client(client, session_id: str): +def add_swebench_session_label_to_docker_client(client, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL): """Return a Docker client wrapper that labels containers it creates.""" - return _DockerClientWithSessionLabel(client, session_id) + return _DockerClientWithSessionLabel(client, session_id, label_key) -def list_swebench_container_ids(session_id: Optional[str] = None) -> Set[str]: +def list_swebench_container_ids(session_id: Optional[str] = None, label_key: str = SWEBENCH_SESSION_LABEL) -> Set[str]: """Return Docker container IDs tagged for one SWE-bench task session.""" if not session_id: return set() @@ -98,7 +105,7 @@ def list_swebench_container_ids(session_id: Optional[str] = None) -> Set[str]: "ps", "-aq", "--filter", - f"label={SWEBENCH_SESSION_LABEL}={session_id}", + f"label={label_key}={session_id}", ], capture_output=True, text=True, @@ -121,10 +128,11 @@ def cleanup_swebench_containers( *, container_ids: Optional[Iterable[str]] = None, session_id: Optional[str] = None, + label_key: str = SWEBENCH_SESSION_LABEL, ): """Stop and remove containers created by the current SWE-bench task.""" targets = set(container_ids or []) - targets.update(list_swebench_container_ids(session_id)) + targets.update(list_swebench_container_ids(session_id, label_key)) targets = sorted(targets) if not targets: return @@ -142,11 +150,11 @@ def cleanup_swebench_containers( _logger.warning("Unexpected error removing containers", exc_info=True) -def add_swebench_session_label_to_run_args(config: dict, session_id: str) -> None: +def add_swebench_session_label_to_run_args(config: dict, session_id: str, label_key: str = SWEBENCH_SESSION_LABEL) -> None: """Add this task's Docker label to mini-swe-agent Docker run args.""" environment = config.setdefault("environment", {}) run_args = list(environment.get("run_args", ["--rm"])) - label_flag = f"{SWEBENCH_SESSION_LABEL}={session_id}" + label_flag = f"{label_key}={session_id}" if label_flag not in run_args: run_args.extend(["--label", label_flag]) environment["run_args"] = run_args diff --git a/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_eval.py b/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_eval.py index 89adfc9b..b1979a03 100644 --- a/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_eval.py +++ b/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_eval.py @@ -30,11 +30,14 @@ FileOperationError, ) from ais_bench.benchmark.tasks.swebench_pro.utils import ( + add_swebench_pro_session_label_to_docker_client, + clean_swebench_pro_images, + cleanup_swebench_pro_containers, ensure_swebench_pro_docker_images, - get_dockerhub_image_uri, eval_with_docker, + get_dockerhub_image_uri, list_swebench_pro_images, - clean_swebench_pro_images, + make_swebench_pro_session_id, ) KEY_INSTANCE_ID = "instance_id" @@ -361,7 +364,14 @@ def run(self, task_state_manager: TaskStateManager): SWEBP_CODES.SWEBENCH_HARNESS_IMPORT_ERROR, "docker SDK is not installed. Install via 'pip install docker'" ) from e - docker_client = docker.from_env() + session_id = make_swebench_pro_session_id() + self.logger.info("SWE-bench Pro eval session_id: %s", session_id) + # Wrap the Docker client so every eval container it creates is tagged + # with this task's session label. Cleanup later filters by that label, + # so concurrent SWE-bench Pro tasks never remove each other's containers. + docker_client = add_swebench_pro_session_label_to_docker_client( + docker.from_env(), session_id + ) prior_images = list_swebench_pro_images(docker_client) ensure_swebench_pro_docker_images( @@ -482,9 +492,15 @@ def run_eval_with_progress(patch, instance, report_dir, scripts_dir_abs, docker_ finally: if pbar is not None: pbar.close() - + # Only remove containers tagged with this task's session label, so + # concurrently running SWE-bench Pro tasks are left untouched. + self.logger.info( + "Cleaning up eval containers for session %s...", session_id + ) + cleanup_swebench_pro_containers(session_id=session_id) + self.logger.info("All instances run.") - + self.logger.info("Cleaning up SWE-bench Pro images...") clean_swebench_pro_images(docker_client, prior_images, self.logger) self.logger.info("Image cleanup completed.") diff --git a/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_infer.py b/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_infer.py index 80112cfd..835b84be 100644 --- a/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_infer.py +++ b/ais_bench/benchmark/tasks/swebench_pro/swebench_pro_infer.py @@ -28,9 +28,11 @@ AISBenchValueError, ) from ais_bench.benchmark.tasks.swebench_pro.utils import ( + add_swebench_pro_session_label_to_run_args, cleanup_swebench_pro_containers, ensure_swebench_pro_docker_images, get_dockerhub_image_uri, + make_swebench_pro_session_id, merge_nested_dicts, build_problem_statement, ) @@ -270,6 +272,12 @@ def run(self, task_state_manager: TaskStateManager): base_config = merge_nested_dicts(default_swebench_config, our_config) if dataset_cfg.get("step_limit") is not None: base_config.setdefault("agent", {})["step_limit"] = dataset_cfg["step_limit"] + session_id = make_swebench_pro_session_id() + # Tag every mini-swe-agent container with this task's session label via + # ``--label`` in Docker run_args. Cleanup later filters by that label so + # concurrently running SWE-bench Pro tasks never remove each other's containers. + add_swebench_pro_session_label_to_run_args(base_config, session_id) + self.logger.info("SWE-bench Pro infer session_id: %s", session_id) self.logger.info(f"base_config '{base_config}'") progress_manager, live_render_group = _make_swebench_pro_progress_manager( @@ -357,13 +365,16 @@ def run_executor(): for future in futures: if not future.running() and not future.done(): future.cancel() - cleanup_swebench_pro_containers() + # Best-effort cleanup of this task's mini-swe-agent containers. + cleanup_swebench_pro_containers(session_id=session_id) executor.shutdown(wait=False) raise finally: if not interrupted[0]: executor.shutdown(wait=True) - cleanup_swebench_pro_containers() + # After all work is done (normal or interrupted), attempt one more + # cleanup of only the containers owned by this task's session. + cleanup_swebench_pro_containers(session_id=session_id) if live_render_group is not None: from rich.live import Live diff --git a/ais_bench/benchmark/tasks/swebench_pro/utils.py b/ais_bench/benchmark/tasks/swebench_pro/utils.py index d8ed7b50..eb41532e 100644 --- a/ais_bench/benchmark/tasks/swebench_pro/utils.py +++ b/ais_bench/benchmark/tasks/swebench_pro/utils.py @@ -1,7 +1,7 @@ import os import subprocess import re -from typing import Callable, Iterable, TypeVar +from typing import Callable, Iterable, Optional, Set, TypeVar import json from ais_bench.benchmark.utils.logging import AISLogger @@ -9,28 +9,54 @@ from ais_bench.benchmark.utils.logging.exceptions import AISBenchRuntimeError, AISBenchImportError -def cleanup_swebench_pro_containers(): - name_filters = ["minisweagent-", "sweb.eval"] - for name_filter in name_filters: - try: - r = subprocess.run( - ["docker", "ps", "-aq", "--filter", f"name={name_filter}"], - capture_output=True, - text=True, - timeout=10, - ) - if r.returncode != 0 or not (r.stdout or "").strip(): - continue - ids = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()] - if not ids: - continue - subprocess.run( - ["docker", "rm", "-f"] + ids, - capture_output=True, - timeout=30, - ) - except (FileNotFoundError, subprocess.TimeoutExpired, Exception): - pass +from ais_bench.benchmark.tasks.swebench.utils import ( + add_swebench_session_label_to_docker_client as _add_session_label_to_docker_client, + add_swebench_session_label_to_run_args as _add_session_label_to_run_args, + cleanup_swebench_containers as _cleanup_session_containers, + list_swebench_container_ids as _list_session_container_ids, + make_swebench_session_id as _make_session_id, +) + + +SWEBENCH_PRO_SESSION_LABEL = "ais_bench.swebench_pro.session" + + +def make_swebench_pro_session_id() -> str: + """Generate a unique session id for one SWE-bench Pro task run.""" + return _make_session_id() + + +def add_swebench_pro_session_label_to_docker_client(client, session_id: str): + """Return a Docker client wrapper that labels containers it creates.""" + return _add_session_label_to_docker_client(client, session_id, SWEBENCH_PRO_SESSION_LABEL) + + +def list_swebench_pro_container_ids(session_id: Optional[str] = None) -> Set[str]: + """Return Docker container IDs tagged for one SWE-bench Pro task session.""" + return _list_session_container_ids(session_id, SWEBENCH_PRO_SESSION_LABEL) + + +def cleanup_swebench_pro_containers( + *, + container_ids: Optional[Iterable[str]] = None, + session_id: Optional[str] = None, +): + """Stop and remove containers created by the current SWE-bench Pro task. + + Only containers tagged with this task's session label (or explicitly passed + via ``container_ids``) are removed, so concurrently running SWE-bench Pro + tasks never clean up each other's containers. + """ + _cleanup_session_containers( + container_ids=container_ids, + session_id=session_id, + label_key=SWEBENCH_PRO_SESSION_LABEL, + ) + + +def add_swebench_pro_session_label_to_run_args(config: dict, session_id: str) -> None: + """Add this task's Docker label to mini-swe-agent Docker run args.""" + _add_session_label_to_run_args(config, session_id, SWEBENCH_PRO_SESSION_LABEL) def list_swebench_pro_images(client) -> set[str]: diff --git a/tests/UT/tasks/swebp_pro/test_session_concurrency.py b/tests/UT/tasks/swebp_pro/test_session_concurrency.py new file mode 100644 index 00000000..e18d3d84 --- /dev/null +++ b/tests/UT/tasks/swebp_pro/test_session_concurrency.py @@ -0,0 +1,299 @@ +"""Multi-process concurrency test for SWE-bench Pro session-scoped container cleanup. + +This test validates the fix for the container-cleanup race condition described in +the SWE-bench Pro dataset: when several SWE-bench Pro evaluation tasks finish at +different times, a task that finishes early must NOT remove containers belonging +to tasks that are still running. + +How it works +------------ +The real ``ais_bench.benchmark.tasks.swebench_pro.utils`` module is loaded from +source (its ``ais_bench`` logging dependencies are stubbed so the module can be +imported on any platform, including Windows where ``fcntl``/``resource`` are +unavailable). ``subprocess.run`` is patched inside each worker process to +simulate the Docker daemon against a shared (``multiprocessing.Manager``) +registry that maps ``container_id -> session_id``. + +Each worker process: + 1. Generates its own ``session_id`` via ``make_swebench_pro_session_id``. + 2. Registers K containers tagged with that session label (simulating creation). + 3. Sleeps a random short interval so cleanups overlap across processes. + 4. Calls ``cleanup_swebench_pro_containers(session_id=...)``. + +The key assertion: every worker removes *exactly* the containers it created and +*none* belonging to other sessions. Under the old name-filter based cleanup +(``minisweagent-`` / ``sweb.eval``), the first worker to clean would remove every +live container, which this test would catch. + +Run directly: python tests/UT/tasks/swebp_pro/test_session_concurrency.py +Or via pytest: pytest tests/UT/tasks/swebp_pro/test_session_concurrency.py +""" +import os +import sys +import time +import types +import random +import subprocess +import importlib.util +import multiprocessing +from unittest.mock import patch + + +# --------------------------------------------------------------------------- # +# Stub the minimal dependencies so the real utils.py can be loaded in isolation +# on any platform (avoids the Unix-only ``fcntl``/``resource`` import chain). +# --------------------------------------------------------------------------- # +def _setup_stubs(): + # Unix-only modules referenced transitively by the ais_bench package. + for name in ("fcntl", "resource"): + if name not in sys.modules: + sys.modules[name] = types.ModuleType(name) + + def _make_pkg(name): + m = types.ModuleType(name) + m.__path__ = [] # mark as a package + return m + + for pkg in ( + "ais_bench", + "ais_bench.benchmark", + "ais_bench.benchmark.utils", + ): + if pkg not in sys.modules: + sys.modules[pkg] = _make_pkg(pkg) + + # ais_bench.benchmark.utils.logging -> provides AISLogger + log_mod = types.ModuleType("ais_bench.benchmark.utils.logging") + + class _StubLogger: + def __init__(self, *a, **k): + pass + + def info(self, *a, **k): + pass + + def debug(self, *a, **k): + pass + + def warning(self, *a, **k): + pass + + def error(self, *a, **k): + pass + + log_mod.AISLogger = _StubLogger + sys.modules["ais_bench.benchmark.utils.logging"] = log_mod + + # ais_bench.benchmark.utils.logging.error_codes -> provides SWEBP_CODES & SWEB_CODES + ec = types.ModuleType("ais_bench.benchmark.utils.logging.error_codes") + + class _SWEBPCodes: + DOCKER_IMAGE_UNAVAILABLE = 1 + SWEBENCH_HARNESS_IMPORT_ERROR = 2 + + class _SWEBCodes: + DOCKER_IMAGE_UNAVAILABLE = 1 + + ec.SWEBP_CODES = _SWEBPCodes() + ec.SWEB_CODES = _SWEBCodes() + sys.modules["ais_bench.benchmark.utils.logging.error_codes"] = ec + + # ais_bench.benchmark.utils.logging.exceptions -> provides error classes + ex = types.ModuleType("ais_bench.benchmark.utils.logging.exceptions") + + class AISBenchRuntimeError(RuntimeError): + pass + + class AISBenchImportError(ImportError): + pass + + ex.AISBenchRuntimeError = AISBenchRuntimeError + ex.AISBenchImportError = AISBenchImportError + sys.modules["ais_bench.benchmark.utils.logging.exceptions"] = ex + + # ais_bench.benchmark.tasks & .swebench -> package stubs so that + # ``from ais_bench.benchmark.tasks.swebench.utils import ...`` (used by + # swebench_pro/utils.py to reuse the session implementation) resolves. + for pkg in ( + "ais_bench.benchmark.tasks", + "ais_bench.benchmark.tasks.swebench", + ): + if pkg not in sys.modules: + sys.modules[pkg] = _make_pkg(pkg) + + # Load the REAL swebench/utils.py by file path and register it under its + # dotted name so swebench_pro/utils.py's import picks up the shared + # implementation (parameterised with ``label_key``). + here = os.path.dirname(os.path.abspath(__file__)) + root = os.path.abspath(os.path.join(here, "..", "..", "..", "..")) + swebench_utils_path = os.path.join( + root, "ais_bench", "benchmark", "tasks", "swebench", "utils.py" + ) + swebench_utils_spec = importlib.util.spec_from_file_location( + "ais_bench.benchmark.tasks.swebench.utils", swebench_utils_path + ) + swebench_utils_mod = importlib.util.module_from_spec(swebench_utils_spec) + sys.modules["ais_bench.benchmark.tasks.swebench.utils"] = swebench_utils_mod + swebench_utils_spec.loader.exec_module(swebench_utils_mod) + + +def _load_real_utils(): + """Load the REAL swebench_pro/utils.py by file path (bypass package init).""" + here = os.path.dirname(os.path.abspath(__file__)) + # tests/UT/tasks/swebp_pro/ -> repo root is four levels up. + root = os.path.abspath(os.path.join(here, "..", "..", "..", "..")) + path = os.path.join( + root, "ais_bench", "benchmark", "tasks", "swebench_pro", "utils.py" + ) + spec = importlib.util.spec_from_file_location( + "_swebench_pro_utils_under_test", path + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +# --------------------------------------------------------------------------- # +# Simulated Docker backend: a shared registry mapping container_id -> session_id. +# --------------------------------------------------------------------------- # +def _make_fake_run(registry): + """Return a subprocess.run replacement that simulates docker ps/rm.""" + + def fake_run(args, *a, **k): + if args[:1] != ["docker"]: + return subprocess.CompletedProcess(args=args, returncode=0, stdout="") + sub = args[1] if len(args) > 1 else "" + if sub == "ps": + # Parse "--filter label=