From b62623fcc0b8346eaa40d6d29d392e1493053679 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 13:51:19 +0300 Subject: [PATCH 01/15] docs(planning): implementation plan for Bulkhead (0.5.0, Epic 3 slice 2) 8 TDD tasks targeting feat/v0.5-bulkhead. Implements the Bulkhead middleware (asyncio.Semaphore-backed, required max_concurrent, bounded acquire wait with BulkheadFullError on timeout), BulkheadFullError (picklable, ClientError parent), three test files (unit, sharing/sanity, Hypothesis property), and wires the public API + planning docs. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/plans/2026-06-05-bulkhead-plan.md | 955 +++++++++++++++++++++ 1 file changed, 955 insertions(+) create mode 100644 planning/plans/2026-06-05-bulkhead-plan.md diff --git a/planning/plans/2026-06-05-bulkhead-plan.md b/planning/plans/2026-06-05-bulkhead-plan.md new file mode 100644 index 0000000..7da09ef --- /dev/null +++ b/planning/plans/2026-06-05-bulkhead-plan.md @@ -0,0 +1,955 @@ +# Bulkhead middleware (0.5.0, Epic 3 slice 2) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship the `Bulkhead` middleware — a concurrency limiter via `asyncio.Semaphore` — plus a `BulkheadFullError` exception. Bulkhead caps in-flight requests at the caller layer (distinct from `httpx2.Limits`, which caps the connection pool). A required `max_concurrent` parameter forces an explicit choice; `acquire_timeout` (default 1.0s, `None` = wait forever, `0` = fail fast) bounds the time spent waiting for a slot. + +**Architecture:** New `bulkhead.py` under `src/httpware/middleware/resilience/`, mirroring the slice-1 layout (`retry.py`, `budget.py`, `_backoff.py`). The middleware owns an `asyncio.Semaphore(max_concurrent)`, wraps `acquire()` in `asyncio.timeout(acquire_timeout)`, and uses an explicit `try/finally` around `next(request)` to guarantee release on every exit path (success, exception, cancellation). `BulkheadFullError(ClientError)` is picklable via the same `__reduce__` + module-level reconstructor pattern used by `StatusError` and `RetryBudgetExhaustedError`. + +**Tech Stack:** Python 3.11+ (`asyncio.timeout` requires 3.11), `httpx2`, `pytest` / `pytest-asyncio` (auto mode), `hypothesis`, `uv`, `just`, `ruff`, `ty`. + +**Target branch:** `feat/v0.5-bulkhead`. Create from `main` before Task 1: `git checkout main && git pull && git checkout -b feat/v0.5-bulkhead`. + +**Source spec:** [`planning/specs/2026-06-05-bulkhead-design.md`](../specs/2026-06-05-bulkhead-design.md). Read it before starting — the *why* for each decision lives there. + +--- + +## File structure + +**New files:** +- `src/httpware/middleware/resilience/bulkhead.py` — `Bulkhead` middleware class. +- `tests/test_bulkhead.py` — unit tests via `httpx2.MockTransport`. +- `tests/test_bulkhead_props.py` — Hypothesis property tests for the concurrency invariant. + +**Modified files:** +- `src/httpware/errors.py` — add `BulkheadFullError(ClientError)` + `_reconstruct_bulkhead_full` reconstructor. +- `src/httpware/middleware/resilience/__init__.py` — add `Bulkhead` to the re-exports. +- `src/httpware/__init__.py` — export `Bulkhead` and `BulkheadFullError`. +- `tests/test_errors.py` — add inheritance + pickle tests for `BulkheadFullError`. +- `tests/test_public_api.py` — add `Bulkhead` and `BulkheadFullError` to the expected exports set. +- `planning/engineering.md` — §8 mark `3-5` shipped; remaining Epic 3 is just `3-6` extension-slot docs. + +**Commit cadence:** each Task ends with a `git add` + `git commit`. Per-task commits keep history reviewable; the branch is squash-mergeable. + +--- + +## Task 1: Branch + scaffold `bulkhead.py` + +**Files:** +- Create: `src/httpware/middleware/resilience/bulkhead.py` (docstring-only stub) + +This task creates only the empty module. The class arrives in Task 3 and the `resilience/__init__.py` re-export wiring also lands in Task 3 (so we don't trip an `ImportError` during the intermediate Task 2 — same lesson as slice 1). + +- [ ] **Step 1: Create the branch** + +Run: +```bash +git checkout main && git pull && git checkout -b feat/v0.5-bulkhead +``` +Expected: switched to a new branch. + +- [ ] **Step 2: Create the stub file** + +Create `src/httpware/middleware/resilience/bulkhead.py` with: +```python +"""Bulkhead middleware — concurrency limiter via asyncio.Semaphore. + +See planning/specs/2026-06-05-bulkhead-design.md for the contract. +""" +``` + +- [ ] **Step 3: Verify file exists** + +Run: +```bash +ls src/httpware/middleware/resilience/ +``` +Expected: `__init__.py _backoff.py budget.py bulkhead.py retry.py` + +- [ ] **Step 4: Lint (sanity)** + +Run: `just lint` +Expected: clean (the docstring-only stub passes ruff/ty). + +- [ ] **Step 5: Stage and commit** + +```bash +git add src/httpware/middleware/resilience/bulkhead.py +git commit -m "scaffold(resilience): empty bulkhead.py stub" +``` + +--- + +## Task 2: Add `BulkheadFullError` to `errors.py` + +**Files:** +- Modify: `src/httpware/errors.py` +- Modify: `tests/test_errors.py` + +- [ ] **Step 1: Write failing tests in `tests/test_errors.py`** + +Append to `tests/test_errors.py`: +```python +from httpware.errors import BulkheadFullError + + +_MAX_CONCURRENT_5 = 5 +_ACQUIRE_TIMEOUT_1_0 = 1.0 + + +def test_bulkhead_full_error_is_client_error() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + assert isinstance(exc, ClientError) + assert exc.max_concurrent == _MAX_CONCURRENT_5 + assert exc.acquire_timeout == _ACQUIRE_TIMEOUT_1_0 + + +def test_bulkhead_full_error_accepts_none_acquire_timeout() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=None) + assert exc.acquire_timeout is None + + +def test_bulkhead_full_error_summary_mentions_caps() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + assert str(exc) == "bulkhead full (max_concurrent=5, acquire_timeout=1.0)" + + +def test_bulkhead_full_error_pickleable() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, BulkheadFullError) + assert restored.max_concurrent == _MAX_CONCURRENT_5 + assert restored.acquire_timeout == _ACQUIRE_TIMEOUT_1_0 +``` + +Run: `uv run pytest tests/test_errors.py -v -k "bulkhead"` +Expected: FAIL (`ImportError: cannot import name 'BulkheadFullError'`). + +- [ ] **Step 2: Add `BulkheadFullError` to `src/httpware/errors.py`** + +Append at the end of `src/httpware/errors.py` (after the existing `RetryBudgetExhaustedError` block — `_reconstruct_bulkhead_full` goes above the class, mirroring the existing reconstructor pattern): + +```python +def _reconstruct_bulkhead_full( + cls: "type[BulkheadFullError]", + max_concurrent: int, + acquire_timeout: float | None, +) -> "BulkheadFullError": + return cls(max_concurrent=max_concurrent, acquire_timeout=acquire_timeout) + + +class BulkheadFullError(ClientError): + """Raised when ``acquire_timeout`` elapses before a Bulkhead slot becomes available. + + Carries the configured caps for caller logging/alerting. + """ + + max_concurrent: int + acquire_timeout: float | None + + def __init__(self, *, max_concurrent: int, acquire_timeout: float | None) -> None: + self.max_concurrent = max_concurrent + self.acquire_timeout = acquire_timeout + super().__init__( + f"bulkhead full (max_concurrent={max_concurrent}, acquire_timeout={acquire_timeout})" + ) + + def __reduce__(self) -> tuple[Any, ...]: + return ( + _reconstruct_bulkhead_full, + (type(self), self.max_concurrent, self.acquire_timeout), + ) +``` + +- [ ] **Step 3: Run the bulkhead tests** + +Run: `uv run pytest tests/test_errors.py -v -k "bulkhead"` +Expected: all 4 PASS. + +- [ ] **Step 4: Run the full suite + lint** + +Run: `just lint && just test` +Expected: clean lint, 100% coverage. + +- [ ] **Step 5: Stage and commit** + +```bash +git add src/httpware/errors.py tests/test_errors.py +git commit -m "feat(errors): add BulkheadFullError(ClientError) + +Distinct exception raised by the Bulkhead middleware when acquire_timeout +elapses without acquiring a slot. Carries max_concurrent + acquire_timeout +for caller logging. Picklable via _reconstruct_bulkhead_full + __reduce__, +mirroring the existing StatusError / RetryBudgetExhaustedError pattern. + +Inherits ClientError (not TimeoutError) because a bulkhead-full event is +a backpressure signal, not a network timeout." +``` + +--- + +## Task 3: Implement `Bulkhead` middleware — happy path + validation + capacity serialization + +**Files:** +- Modify: `src/httpware/middleware/resilience/bulkhead.py` +- Modify: `src/httpware/middleware/resilience/__init__.py` +- Create: `tests/test_bulkhead.py` + +This task implements the middleware skeleton with: constructor validation, the basic acquire→try/finally→release pattern, capacity serialization (second request waits for first to release), and re-export wiring. + +- [ ] **Step 1: Write failing tests in `tests/test_bulkhead.py`** + +Create `tests/test_bulkhead.py`: +```python +"""Tests for the Bulkhead middleware. + +Mocks the transport via httpx2.MockTransport. Concurrency tests use real +asyncio coroutines with sub-100ms timeouts so the suite stays fast. +""" + +import asyncio +from collections.abc import Callable +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import AsyncClient +from httpware.errors import BulkheadFullError +from httpware.middleware.resilience.bulkhead import Bulkhead + + +_MAX_CONCURRENT_1 = 1 +_MAX_CONCURRENT_2 = 2 + + +class _SlowHandler: + """Mock handler that blocks for `delay` seconds before returning 200 OK.""" + + def __init__(self, delay: float) -> None: + self.delay = delay + self.in_flight = 0 + self.max_in_flight = 0 + self.calls = 0 + + async def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + self.in_flight += 1 + self.max_in_flight = max(self.max_in_flight, self.in_flight) + try: + await asyncio.sleep(self.delay) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + self.in_flight -= 1 + + +def _client(handler: Callable[[httpx2.Request], httpx2.Response], *, bulkhead: Bulkhead) -> AsyncClient: + transport = httpx2.MockTransport(handler) + return AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[bulkhead], + ) + + +def test_max_concurrent_zero_rejected() -> None: + with pytest.raises(ValueError, match="max_concurrent must be >= 1"): + Bulkhead(max_concurrent=0) + + +def test_max_concurrent_negative_rejected() -> None: + with pytest.raises(ValueError, match="max_concurrent must be >= 1"): + Bulkhead(max_concurrent=-1) + + +def test_negative_acquire_timeout_rejected() -> None: + with pytest.raises(ValueError, match="acquire_timeout must be >= 0"): + Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=-0.1) + + +def test_acquire_timeout_zero_accepted() -> None: + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) + assert bulkhead._acquire_timeout == 0 # noqa: SLF001 + + +def test_acquire_timeout_none_accepted() -> None: + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + assert bulkhead._acquire_timeout is None # noqa: SLF001 + + +async def test_succeeds_when_slot_available() -> None: + handler = _SlowHandler(delay=0.0) + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_2)) + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 + + +async def test_serializes_at_capacity() -> None: + """With max_concurrent=1 and 3 concurrent calls, in-flight count never exceeds 1.""" + handler = _SlowHandler(delay=0.02) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None), + ) + await asyncio.gather( + client.get("https://example.test/a"), + client.get("https://example.test/b"), + client.get("https://example.test/c"), + ) + assert handler.calls == 3 # noqa: PLR2004 — three concurrent gets above + assert handler.max_in_flight == 1 # cap honored + + +async def test_max_concurrent_2_observes_at_most_2_in_flight() -> None: + handler = _SlowHandler(delay=0.02) + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_2, acquire_timeout=None)) + await asyncio.gather( + client.get("https://example.test/a"), + client.get("https://example.test/b"), + client.get("https://example.test/c"), + client.get("https://example.test/d"), + ) + assert handler.calls == 4 # noqa: PLR2004 — four concurrent gets above + assert handler.max_in_flight <= _MAX_CONCURRENT_2 +``` + +Run: `uv run pytest tests/test_bulkhead.py -v` +Expected: all FAIL (`ImportError` for `Bulkhead`). + +- [ ] **Step 2: Implement the `Bulkhead` middleware** + +Replace `src/httpware/middleware/resilience/bulkhead.py` content with: +```python +"""Bulkhead middleware — concurrency limiter via asyncio.Semaphore. + +See planning/specs/2026-06-05-bulkhead-design.md for the contract. + +The middleware owns an asyncio.Semaphore(max_concurrent). On each request, +it acquires a slot (bounded by acquire_timeout via asyncio.timeout) and +releases the slot in a try/finally so success, exceptions, and cancellation +all release deterministically. + +Bulkhead is the sharable unit — pass the same instance to multiple +AsyncClient(middleware=[shared]) calls to enforce a joint cap across clients. +""" + +import asyncio + +import httpx2 + +from httpware.errors import BulkheadFullError +from httpware.middleware import Next + + +_MAX_CONCURRENT_INVALID = "max_concurrent must be >= 1" +_ACQUIRE_TIMEOUT_INVALID = "acquire_timeout must be >= 0" + + +class Bulkhead: + """Concurrency limiter middleware. See module docstring for behavior.""" + + def __init__( + self, + *, + max_concurrent: int, + acquire_timeout: float | None = 1.0, + ) -> None: + if max_concurrent < 1: + raise ValueError(_MAX_CONCURRENT_INVALID) + if acquire_timeout is not None and acquire_timeout < 0: + raise ValueError(_ACQUIRE_TIMEOUT_INVALID) + self._max_concurrent = max_concurrent + self._acquire_timeout = acquire_timeout + self._sem = asyncio.Semaphore(max_concurrent) + + async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 + """Acquire a slot (bounded by acquire_timeout), invoke next, release.""" + try: + if self._acquire_timeout is None: + await self._sem.acquire() + else: + async with asyncio.timeout(self._acquire_timeout): + await self._sem.acquire() + except asyncio.TimeoutError as exc: + raise BulkheadFullError( + max_concurrent=self._max_concurrent, + acquire_timeout=self._acquire_timeout, + ) from exc + + try: + return await next(request) + finally: + self._sem.release() +``` + +- [ ] **Step 3: Wire `Bulkhead` into `resilience/__init__.py`** + +Read `src/httpware/middleware/resilience/__init__.py`. Add `Bulkhead` to the re-exports and `__all__`. The full file becomes: +```python +"""Resilience primitives: Retry middleware and RetryBudget token bucket.""" + +from httpware.middleware.resilience.budget import RetryBudget +from httpware.middleware.resilience.bulkhead import Bulkhead +from httpware.middleware.resilience.retry import Retry + + +__all__ = ["Bulkhead", "Retry", "RetryBudget"] +``` + +- [ ] **Step 4: Run the Task 3 tests** + +Run: `uv run pytest tests/test_bulkhead.py -v` +Expected: all PASS. + +- [ ] **Step 5: Run lint + full suite** + +Run: `just lint && just test` +Expected: clean, 100% coverage. + +- [ ] **Step 6: Stage and commit** + +```bash +git add src/httpware/middleware/resilience/bulkhead.py src/httpware/middleware/resilience/__init__.py tests/test_bulkhead.py +git commit -m "feat(resilience): Bulkhead middleware — happy path + validation + capacity + +Constructor validates max_concurrent >= 1 and acquire_timeout >= 0 +(None and 0 both accepted). asyncio.Semaphore enforces the cap; the +explicit acquire + try/finally around next() guarantees release on +every exit path. Acquire failures map to BulkheadFullError. + +Subsequent tasks cover fail-fast / wait-forever modes, exception + +cancellation release semantics, cross-client sharing, and property tests." +``` + +--- + +## Task 4: `Bulkhead` — `acquire_timeout` behaviors (bounded wait, fail-fast, wait-forever) + +**Files:** +- Modify: `tests/test_bulkhead.py` + +The implementation already supports all three modes. This task adds explicit tests that pin each mode. + +- [ ] **Step 1: Append tests to `tests/test_bulkhead.py`** + +```python +_ACQUIRE_TIMEOUT_SHORT = 0.02 +_ACQUIRE_TIMEOUT_LONG = 0.1 + + +async def test_bounded_wait_raises_bulkhead_full_error() -> None: + """With max_concurrent=1 and acquire_timeout=0.02, the second call raises after ~20ms.""" + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_LONG) # holds slot for 100ms + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=_ACQUIRE_TIMEOUT_SHORT), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) # let first acquire the slot + with pytest.raises(BulkheadFullError) as info: + await client.get("https://example.test/b") + assert info.value.max_concurrent == _MAX_CONCURRENT_1 + assert info.value.acquire_timeout == _ACQUIRE_TIMEOUT_SHORT + await first # cleanup + + +async def test_acquire_timeout_zero_fails_fast() -> None: + """With acquire_timeout=0, the second call raises immediately without waiting.""" + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_LONG) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) + with pytest.raises(BulkheadFullError) as info: + await client.get("https://example.test/b") + assert info.value.acquire_timeout == 0 + await first + + +async def test_acquire_timeout_none_waits_forever() -> None: + """With acquire_timeout=None, the second call waits until the first releases.""" + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_SHORT) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + second = asyncio.create_task(client.get("https://example.test/b")) + responses = await asyncio.wait_for(asyncio.gather(first, second), timeout=1.0) + assert all(r.status_code == HTTPStatus.OK for r in responses) + assert handler.calls == 2 # noqa: PLR2004 — both eventually succeeded +``` + +- [ ] **Step 2: Run the tests** + +Run: `uv run pytest tests/test_bulkhead.py -v -k "bounded_wait or fails_fast or waits_forever"` +Expected: all PASS. + +- [ ] **Step 3: Run lint + full suite** + +Run: `just lint && just test` +Expected: clean, 100% coverage. + +- [ ] **Step 4: Stage and commit** + +```bash +git add tests/test_bulkhead.py +git commit -m "test(resilience): Bulkhead acquire_timeout modes — bounded / fail-fast / forever + +Pins the three acquire_timeout modes: bounded wait raises BulkheadFullError +after the configured timeout, =0 fails fast without waiting, =None waits +until a slot frees." +``` + +--- + +## Task 5: `Bulkhead` — release semantics (exception, cancellation) + +**Files:** +- Modify: `tests/test_bulkhead.py` + +The `try/finally` in `__call__` already releases on every exit. This task pins the behavior with explicit tests so a future refactor that drops the `finally` is caught immediately. + +- [ ] **Step 1: Append tests to `tests/test_bulkhead.py`** + +```python +async def test_slot_released_after_exception_in_next() -> None: + """If next() raises, the slot is released — subsequent calls succeed immediately.""" + call_count = {"n": 0} + + def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + if call_count["n"] == 1: + msg = "boom" + raise RuntimeError(msg) + return httpx2.Response(HTTPStatus.OK, request=request) + + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0)) + + # First call raises; slot must release. + with pytest.raises(RuntimeError, match="boom"): + await client.get("https://example.test/a") + + # Second call must succeed immediately — fail-fast=0 proves the slot is free. + response = await client.get("https://example.test/b") + assert response.status_code == HTTPStatus.OK + assert call_count["n"] == 2 # noqa: PLR2004 — second call reached handler + + +async def test_slot_released_on_cancellation() -> None: + """If the calling task is cancelled while next() runs, the slot is released.""" + handler = _SlowHandler(delay=0.5) # would block indefinitely + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) + client = _client(handler, bulkhead=bulkhead) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.01) # let first acquire and start sleeping in handler + first.cancel() + with pytest.raises(asyncio.CancelledError): + await first + + # Slot must now be released — fail-fast=0 next call proves it. + handler.delay = 0.0 # speed up the next request + response = await client.get("https://example.test/b") + assert response.status_code == HTTPStatus.OK + + +async def test_cancellation_before_acquire_does_not_hold_slot() -> None: + """Cancellation while waiting for a slot must not leak the slot to the cancelled task.""" + handler = _SlowHandler(delay=0.05) + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + client = _client(handler, bulkhead=bulkhead) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) # first acquires + second = asyncio.create_task(client.get("https://example.test/b")) # waits for slot + await asyncio.sleep(0.005) # ensure second is parked on acquire + second.cancel() + with pytest.raises(asyncio.CancelledError): + await second + + # First should still complete normally. + response = await first + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 # second never reached the handler +``` + +- [ ] **Step 2: Run the tests** + +Run: `uv run pytest tests/test_bulkhead.py -v -k "released or cancellation"` +Expected: all PASS. + +- [ ] **Step 3: Run lint + full suite** + +Run: `just lint && just test` +Expected: clean, 100% coverage. + +- [ ] **Step 4: Stage and commit** + +```bash +git add tests/test_bulkhead.py +git commit -m "test(resilience): Bulkhead release semantics on exception + cancellation + +Pins the try/finally release: exception in next() releases the slot, +cancellation during next() releases the slot, cancellation while +parked on acquire() does not hold a slot." +``` + +--- + +## Task 6: `Bulkhead` — sharing across clients + construct-outside-loop sanity + +**Files:** +- Modify: `tests/test_bulkhead.py` + +- [ ] **Step 1: Append tests to `tests/test_bulkhead.py`** + +```python +# Constructed at module scope on purpose — pins the construct-outside-loop behavior. +_MODULE_SCOPE_BULKHEAD = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + + +async def test_construct_outside_event_loop_then_use_inside() -> None: + """Bulkhead constructed at module scope must work when used inside an event loop.""" + handler = _SlowHandler(delay=0.0) + client = _client(handler, bulkhead=_MODULE_SCOPE_BULKHEAD) + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + + +async def test_shared_bulkhead_enforces_joint_cap() -> None: + """One Bulkhead shared across two AsyncClients enforces the joint cap.""" + # Both clients use ONE handler that tracks combined in-flight across all calls. + # asyncio is single-threaded so a plain dict counter is safe between awaits. + state = {"in_flight": 0, "max_in_flight": 0} + + async def shared_handler(request: httpx2.Request) -> httpx2.Response: + state["in_flight"] += 1 + state["max_in_flight"] = max(state["max_in_flight"], state["in_flight"]) + try: + await asyncio.sleep(0.02) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + state["in_flight"] -= 1 + + shared = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + client_a = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(shared_handler)), + middleware=[shared], + ) + client_b = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(shared_handler)), + middleware=[shared], + ) + + await asyncio.gather( + client_a.get("https://upstream-a.example.test/x"), + client_a.get("https://upstream-a.example.test/y"), + client_b.get("https://upstream-b.example.test/x"), + client_b.get("https://upstream-b.example.test/y"), + ) + + # The shared bulkhead enforces max=1 across BOTH clients combined. + assert state["max_in_flight"] <= _MAX_CONCURRENT_1 +``` + +- [ ] **Step 2: Run the tests** + +Run: `uv run pytest tests/test_bulkhead.py -v -k "construct_outside or shared_bulkhead"` +Expected: PASS. + +- [ ] **Step 3: Run lint + full suite** + +Run: `just lint && just test` +Expected: clean, 100% coverage. + +- [ ] **Step 4: Stage and commit** + +```bash +git add tests/test_bulkhead.py +git commit -m "test(resilience): Bulkhead sharing across clients + construct-outside-loop + +Pins two behaviors: a Bulkhead instantiated at module scope (outside any +event loop) works correctly when used inside one, and a single Bulkhead +instance passed to multiple AsyncClient instances enforces the joint cap +across all of them." +``` + +--- + +## Task 7: Hypothesis property tests for `Bulkhead` + +**Files:** +- Create: `tests/test_bulkhead_props.py` + +- [ ] **Step 1: Create the property-test file** + +```python +"""Hypothesis property tests for Bulkhead. + +Properties verified: +1. Observed in-flight count never exceeds max_concurrent under any interleaving. +2. With acquire_timeout=0 and a full bulkhead, the call raises BulkheadFullError. +3. Successful acquisitions are released — back-to-back calls eventually drain + without leaking slots. +""" + +import asyncio +from http import HTTPStatus + +import httpx2 +import pytest +from hypothesis import given, settings +from hypothesis import strategies as st + +from httpware import AsyncClient +from httpware.errors import BulkheadFullError +from httpware.middleware.resilience.bulkhead import Bulkhead + + +class _InFlightHandler: + """Tracks max simultaneous in-flight count across all calls.""" + + def __init__(self, delay: float) -> None: + self.delay = delay + self.in_flight = 0 + self.max_in_flight = 0 + self.calls = 0 + + async def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + self.in_flight += 1 + self.max_in_flight = max(self.max_in_flight, self.in_flight) + try: + await asyncio.sleep(self.delay) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + self.in_flight -= 1 + + +@given( + max_concurrent=st.integers(min_value=1, max_value=8), + n_requests=st.integers(min_value=1, max_value=32), + delay=st.floats(min_value=0.001, max_value=0.005), +) +@settings(max_examples=30, deadline=None) +async def test_in_flight_never_exceeds_max_concurrent( + max_concurrent: int, n_requests: int, delay: float, +) -> None: + handler = _InFlightHandler(delay=delay) + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None)], + ) + await asyncio.gather( + *(client.get(f"https://example.test/{i}") for i in range(n_requests)) + ) + assert handler.calls == n_requests + assert handler.max_in_flight <= max_concurrent + + +@given( + max_concurrent=st.integers(min_value=1, max_value=4), + extra_requests=st.integers(min_value=1, max_value=8), +) +@settings(max_examples=20, deadline=None) +async def test_fail_fast_rejects_when_at_capacity( + max_concurrent: int, extra_requests: int, +) -> None: + handler = _InFlightHandler(delay=0.05) # hold slots long enough for fail-fast to fire + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=0)], + ) + + # Fill the bulkhead with max_concurrent long-running tasks. + holders = [ + asyncio.create_task(client.get(f"https://example.test/hold-{i}")) + for i in range(max_concurrent) + ] + await asyncio.sleep(0.005) # let the holders acquire their slots + + # Any extra requests should fail fast with BulkheadFullError. + for i in range(extra_requests): + with pytest.raises(BulkheadFullError): + await client.get(f"https://example.test/extra-{i}") + + # Cleanup the holders. + await asyncio.gather(*holders) + + +@given( + max_concurrent=st.integers(min_value=1, max_value=4), + n_requests=st.integers(min_value=4, max_value=16), +) +@settings(max_examples=20, deadline=None) +async def test_no_slot_leak_after_drain(max_concurrent: int, n_requests: int) -> None: + """After all calls complete, the bulkhead has its full capacity available.""" + handler = _InFlightHandler(delay=0.001) + bulkhead = Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None) + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[bulkhead], + ) + + await asyncio.gather( + *(client.get(f"https://example.test/{i}") for i in range(n_requests)) + ) + + # Bulkhead should be drained — _value equals max_concurrent again. + # asyncio.Semaphore._value is implementation detail but reliable across CPython 3.11+. + assert bulkhead._sem._value == max_concurrent # noqa: SLF001 +``` + +Add `import pytest` at the top alongside the existing imports. + +- [ ] **Step 2: Run the property tests** + +Run: `uv run pytest tests/test_bulkhead_props.py -v` +Expected: all PASS. + +- [ ] **Step 3: Run lint + full suite** + +Run: `just lint && just test` +Expected: clean, 100% coverage. + +- [ ] **Step 4: Stage and commit** + +```bash +git add tests/test_bulkhead_props.py +git commit -m "test(resilience): Hypothesis property tests for Bulkhead + +Three invariants: in-flight never exceeds max_concurrent under any +interleaving; fail-fast (acquire_timeout=0) raises BulkheadFullError +when at capacity; after all calls drain, the bulkhead has full capacity +available again (no slot leak)." +``` + +--- + +## Task 8: Public API exports + final verification + +**Files:** +- Modify: `src/httpware/__init__.py` +- Modify: `tests/test_public_api.py` +- Modify: `planning/engineering.md` + +- [ ] **Step 1: Add `Bulkhead` + `BulkheadFullError` to `src/httpware/__init__.py`** + +Read `src/httpware/__init__.py`. Add `BulkheadFullError` to the `from httpware.errors import (...)` block in alphabetical position (between `BadRequestError` and `ClientError`). + +Update the resilience import line from: +```python +from httpware.middleware.resilience import Retry, RetryBudget +``` +to: +```python +from httpware.middleware.resilience import Bulkhead, Retry, RetryBudget +``` + +Add both new symbols to `__all__` in alphabetical order: +- `"Bulkhead"` (between `"BadRequestError"` and `"ClientError"`) +- `"BulkheadFullError"` (immediately after `"Bulkhead"`) + +- [ ] **Step 2: Update `tests/test_public_api.py`** + +In `test_expected_exports`, add `"Bulkhead"` and `"BulkheadFullError"` to the `expected` set. Insertion point: alphabetical, between `"AsyncClient"` and `"ClientError"` (the file's `expected` is unordered as a set, but insert visually in the same alphabetic neighborhood). + +- [ ] **Step 3: Run the public-API test** + +Run: `uv run pytest tests/test_public_api.py -v` +Expected: PASS. + +- [ ] **Step 4: Update `planning/engineering.md` §8** + +Find §8 "Remaining roadmap" and the Epic 3 subsection. Current text: +``` +- **Epic 3 — Resilience:** + - **Shipped in v0.4 slice 1:** `Retry` middleware + Finagle-style `RetryBudget` token bucket + `attempt_timeout=` parameter (folded-in 3-1). ... + - **Remaining:** `3-5` `Bulkhead`, `3-6` extension-slot docs. +``` + +Update to: +``` +- **Epic 3 — Resilience:** + - **Shipped in v0.4 slice 1:** `Retry` middleware + Finagle-style `RetryBudget` token bucket + `attempt_timeout=` parameter (folded-in 3-1). See [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](specs/2026-06-05-retry-and-retry-budget-design.md) and [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](plans/2026-06-05-retry-and-retry-budget-plan.md). + - **Shipped in v0.5:** `Bulkhead` middleware (concurrency limiter via `asyncio.Semaphore` with bounded acquire wait). See [`planning/specs/2026-06-05-bulkhead-design.md`](specs/2026-06-05-bulkhead-design.md) and [`planning/plans/2026-06-05-bulkhead-plan.md`](plans/2026-06-05-bulkhead-plan.md). + - **Remaining:** `3-6` extension-slot docs. +``` + +- [ ] **Step 5: Run the full suite with coverage gate** + +Run: `just test` +Expected: ALL tests PASS, coverage = 100%. + +- [ ] **Step 6: Run the full lint** + +Run: `just lint-ci` +Expected: clean. + +- [ ] **Step 7: Verify the architecture invariants from `CLAUDE.md`** + +Run each in turn: +```bash +grep -rE 'httpx2\._' src/httpware/ || echo "PASS: no httpx2 private API" +grep -rE 'from __future__ import annotations' src/httpware/ || echo "PASS: no __future__ annotations" +grep -rE '\bprint\(' src/httpware/ || echo "PASS: no print()" +grep -rE 'logging\.(basicConfig|getLogger)\(\)' src/httpware/ || echo "PASS: no global logging" +grep -rE '# (type|mypy): ignore' src/httpware/ || echo "PASS: no type/mypy ignore" +``` + +Each should print the `PASS` line (the grep returns no matches). + +- [ ] **Step 8: Verify the optional-extras isolation invariant** + +Bulkhead is pure stdlib; importing httpware should not pull pydantic/msgspec/otel: +```bash +uv run pytest tests/test_optional_extras_isolation.py -v +``` +Expected: PASS. + +- [ ] **Step 9: Stage and commit** + +```bash +git add src/httpware/__init__.py tests/test_public_api.py +git commit -m "feat(api): export Bulkhead and BulkheadFullError + +Completes the v0.5 slice: Bulkhead concurrency limiter middleware + its +backpressure exception. Pure-stdlib core, no new optional extra." +``` + +- [ ] **Step 10: Final commit for planning docs** + +```bash +git add planning/engineering.md +git commit -m "docs(planning): mark 3-5 Bulkhead shipped in v0.5" +``` + +- [ ] **Step 11: Push the branch and open the PR** + +```bash +git push -u origin feat/v0.5-bulkhead +``` + +Then create a PR per the project's normal cadence (`gh pr create`). The PR body should reference both the spec (`planning/specs/2026-06-05-bulkhead-design.md`) and this plan. Do NOT bundle release-notes work into this PR — release notes for 0.5.0 happen in a separate release-prep PR (potentially bundled with 3-6 docs). + +--- + +## Out of scope for this plan (per the spec) + +These items are deliberately deferred. Do NOT implement them as part of this slice; if the implementation pulls toward them, stop and surface to the user instead. + +- Per-host / per-route partitioning of the cap. +- Separate `BulkheadLimit` public type. +- Queue-depth / in-flight metrics on the `Bulkhead` object. +- Fallback / shed-load callbacks. +- Connection-pool integration with `httpx2.Limits`. +- Release notes / version bump for 0.5.0 — happens in a separate release-prep PR. From d571d73d61af07fb701636e1c3afe8cb91f398db Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 13:53:17 +0300 Subject: [PATCH 02/15] scaffold(resilience): empty bulkhead.py stub --- src/httpware/middleware/resilience/bulkhead.py | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 src/httpware/middleware/resilience/bulkhead.py diff --git a/src/httpware/middleware/resilience/bulkhead.py b/src/httpware/middleware/resilience/bulkhead.py new file mode 100644 index 0000000..46fa8f3 --- /dev/null +++ b/src/httpware/middleware/resilience/bulkhead.py @@ -0,0 +1,4 @@ +"""Bulkhead middleware — concurrency limiter via asyncio.Semaphore. + +See planning/specs/2026-06-05-bulkhead-design.md for the contract. +""" From cfb139f92558c1081ff2aaaca8eb480059e02bff Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 13:59:44 +0300 Subject: [PATCH 03/15] docs(release): bundle Bulkhead into 0.4.0 alongside Retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per user decision, Bulkhead ships in 0.4.0 with Retry rather than waiting for its own 0.5.0 cut. - Amend planning/releases/0.4.0.md to describe Bulkhead + BulkheadFullError alongside Retry / RetryBudget / NetworkError, with usage examples that show the recommended Bulkhead-outside-Retry middleware ordering. - Update Bulkhead spec target release: 0.5.0 → 0.4.0 (bundled with slice 1). - Update Bulkhead plan target branch: feat/v0.5-bulkhead → feat/v0.4-bulkhead; update Task 8 release-notes guidance to reflect amendment vs. fresh draft. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/plans/2026-06-05-bulkhead-plan.md | 18 +++--- planning/releases/0.4.0.md | 68 ++++++++++++++++++-- planning/specs/2026-06-05-bulkhead-design.md | 4 +- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/planning/plans/2026-06-05-bulkhead-plan.md b/planning/plans/2026-06-05-bulkhead-plan.md index 7da09ef..ece683a 100644 --- a/planning/plans/2026-06-05-bulkhead-plan.md +++ b/planning/plans/2026-06-05-bulkhead-plan.md @@ -1,4 +1,4 @@ -# Bulkhead middleware (0.5.0, Epic 3 slice 2) Implementation Plan +# Bulkhead middleware (0.4.0, Epic 3 slice 2) Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. @@ -8,7 +8,7 @@ **Tech Stack:** Python 3.11+ (`asyncio.timeout` requires 3.11), `httpx2`, `pytest` / `pytest-asyncio` (auto mode), `hypothesis`, `uv`, `just`, `ruff`, `ty`. -**Target branch:** `feat/v0.5-bulkhead`. Create from `main` before Task 1: `git checkout main && git pull && git checkout -b feat/v0.5-bulkhead`. +**Target branch:** `feat/v0.4-bulkhead`. Create from `main` before Task 1: `git checkout main && git pull && git checkout -b feat/v0.4-bulkhead`. **Source spec:** [`planning/specs/2026-06-05-bulkhead-design.md`](../specs/2026-06-05-bulkhead-design.md). Read it before starting — the *why* for each decision lives there. @@ -44,7 +44,7 @@ This task creates only the empty module. The class arrives in Task 3 and the `re Run: ```bash -git checkout main && git pull && git checkout -b feat/v0.5-bulkhead +git checkout main && git pull && git checkout -b feat/v0.4-bulkhead ``` Expected: switched to a new branch. @@ -881,7 +881,7 @@ Update to: ``` - **Epic 3 — Resilience:** - **Shipped in v0.4 slice 1:** `Retry` middleware + Finagle-style `RetryBudget` token bucket + `attempt_timeout=` parameter (folded-in 3-1). See [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](specs/2026-06-05-retry-and-retry-budget-design.md) and [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](plans/2026-06-05-retry-and-retry-budget-plan.md). - - **Shipped in v0.5:** `Bulkhead` middleware (concurrency limiter via `asyncio.Semaphore` with bounded acquire wait). See [`planning/specs/2026-06-05-bulkhead-design.md`](specs/2026-06-05-bulkhead-design.md) and [`planning/plans/2026-06-05-bulkhead-plan.md`](plans/2026-06-05-bulkhead-plan.md). + - **Shipped in v0.4 slice 2:** `Bulkhead` middleware (concurrency limiter via `asyncio.Semaphore` with bounded acquire wait). See [`planning/specs/2026-06-05-bulkhead-design.md`](specs/2026-06-05-bulkhead-design.md) and [`planning/plans/2026-06-05-bulkhead-plan.md`](plans/2026-06-05-bulkhead-plan.md). - **Remaining:** `3-6` extension-slot docs. ``` @@ -922,7 +922,7 @@ Expected: PASS. git add src/httpware/__init__.py tests/test_public_api.py git commit -m "feat(api): export Bulkhead and BulkheadFullError -Completes the v0.5 slice: Bulkhead concurrency limiter middleware + its +Completes the v0.4 slice 2 slice: Bulkhead concurrency limiter middleware + its backpressure exception. Pure-stdlib core, no new optional extra." ``` @@ -930,16 +930,16 @@ backpressure exception. Pure-stdlib core, no new optional extra." ```bash git add planning/engineering.md -git commit -m "docs(planning): mark 3-5 Bulkhead shipped in v0.5" +git commit -m "docs(planning): mark 3-5 Bulkhead shipped in v0.4 slice 2" ``` - [ ] **Step 11: Push the branch and open the PR** ```bash -git push -u origin feat/v0.5-bulkhead +git push -u origin feat/v0.4-bulkhead ``` -Then create a PR per the project's normal cadence (`gh pr create`). The PR body should reference both the spec (`planning/specs/2026-06-05-bulkhead-design.md`) and this plan. Do NOT bundle release-notes work into this PR — release notes for 0.5.0 happen in a separate release-prep PR (potentially bundled with 3-6 docs). +Then create a PR per the project's normal cadence (`gh pr create`). The PR body should reference both the spec (`planning/specs/2026-06-05-bulkhead-design.md`) and this plan. The PR includes an amendment to `planning/releases/0.4.0.md` describing Bulkhead alongside Retry (per the decision to ship Bulkhead in 0.4.0). The amendment commit lives at the start of the branch. --- @@ -952,4 +952,4 @@ These items are deliberately deferred. Do NOT implement them as part of this sli - Queue-depth / in-flight metrics on the `Bulkhead` object. - Fallback / shed-load callbacks. - Connection-pool integration with `httpx2.Limits`. -- Release notes / version bump for 0.5.0 — happens in a separate release-prep PR. +- Version bump for 0.4.0 — happens in a separate release-prep PR. Release notes for 0.4.0 are amended in this branch to describe Bulkhead alongside Retry. diff --git a/planning/releases/0.4.0.md b/planning/releases/0.4.0.md index d8cb335..a515f17 100644 --- a/planning/releases/0.4.0.md +++ b/planning/releases/0.4.0.md @@ -1,8 +1,8 @@ -# httpware 0.4.0 — Retry middleware + RetryBudget +# httpware 0.4.0 — Retry, RetryBudget, and Bulkhead **0.4.0 is additive. No breaking changes.** Code written against 0.3.0 continues to work unchanged. -This release ships the first slice of Epic 3 (Resilience): a `Retry` middleware with sensible defaults, a Finagle-style `RetryBudget` token bucket that prevents retry storms, and a refinement to the exception tree (`NetworkError`) that lets callers tell transient network failures apart from non-retryable transport failures. +This release ships Epic 3 (Resilience) almost entirely: a `Retry` middleware with sensible defaults, a Finagle-style `RetryBudget` token bucket that prevents retry storms, a `Bulkhead` middleware that caps caller-side concurrency, and a refinement to the exception tree (`NetworkError`) that lets callers tell transient network failures apart from non-retryable transport failures. ## New features @@ -16,13 +16,20 @@ This release ships the first slice of Epic 3 (Resilience): a `Retry` middleware - **`httpware.RetryBudget`** — Finagle-style token bucket bounding retry rate to prevent retry storms when downstream services degrade. Defaults: `ttl=10s`, `min_retries_per_sec=10`, `percent_can_retry=0.2` (match Finagle / AWS SDK / Envoy). Per `Retry`-instance by default; pass an explicit `RetryBudget` to share across multiple `Retry` middlewares (e.g., several `AsyncClient`s hitting the same downstream). - **`httpware.RetryBudgetExhaustedError`** — distinct `ClientError` raised when the budget refuses a retry. Carries `last_response: httpx2.Response | None`, `last_exception: BaseException | None`, and `attempts: int`. Picklable across process boundaries. - **`httpware.NetworkError(TransportError)`** — refines the `AsyncClient` terminal mapping so transient `httpx2.NetworkError`-family exceptions (`ConnectError`, `ReadError`, `WriteError`, `CloseError`) raise `httpware.NetworkError`. `InvalidURL` and `CookieConflict` continue to raise bare `TransportError`. Pool-acquisition timeouts (`httpx2.PoolTimeout`) continue to raise `httpware.TimeoutError`. +- **`httpware.Bulkhead`** — middleware that caps in-flight requests at the caller layer via `asyncio.Semaphore`. Distinct from `httpx2.Limits` (which caps the connection pool); Bulkhead caps the number of concurrent calls regardless of pool state. Parameters: + - `max_concurrent` (required, no default — there's no universally-correct value; depends on downstream capacity) + - `acquire_timeout=1.0` seconds, with `None` = wait forever and `0` = fail fast on full bulkhead + - On `acquire_timeout` elapsed: raises `BulkheadFullError(ClientError)` carrying `max_concurrent` and `acquire_timeout` + - Slot release is guaranteed by an explicit `try/finally` around `next()` — success, exception, and cancellation all release deterministically + - `Bulkhead` IS the sharable unit; pass the same instance to multiple `AsyncClient(middleware=[shared])` calls to enforce a joint cap across clients +- **`httpware.BulkheadFullError`** — distinct `ClientError` raised when the Bulkhead refuses to admit a request within `acquire_timeout`. Carries `max_concurrent: int` and `acquire_timeout: float | None`. Picklable across process boundaries. ## Backwards compatibility Subclassing keeps existing catch-blocks working unchanged: - `except TransportError` still catches all transient + permanent transport-layer failures (`NetworkError` is a subclass). -- `except ClientError` still catches everything in the httpware exception tree, including the new `RetryBudgetExhaustedError`. +- `except ClientError` still catches everything in the httpware exception tree, including the new `RetryBudgetExhaustedError` and `BulkheadFullError`. The terminal mapping change only narrows what callers see when they check the *exact* type. Catch-by-isinstance behaves the same. @@ -84,14 +91,61 @@ Retry( ) ``` +Cap caller-side concurrency with `Bulkhead`. Note: `Bulkhead` goes *outside* `Retry` in the middleware stack so a retrying request holds one slot across all attempts (rather than re-acquiring per retry): + +```python +from httpware import AsyncClient, Bulkhead, Retry + +async with AsyncClient( + base_url="https://api.example.com", + middleware=[ + Bulkhead(max_concurrent=10), # cap total in-flight at 10 + Retry(), # retries happen inside the Bulkhead slot + ], +) as client: + user = await client.get("/users/1", response_model=User) +``` + +Catch a full bulkhead: + +```python +from httpware import BulkheadFullError + +try: + response = await client.get("/users/1") +except BulkheadFullError as exc: + logger.warning( + "bulkhead full: %d in-flight, waited %s", + exc.max_concurrent, + exc.acquire_timeout, + ) +``` + +Share a Bulkhead across multiple clients hitting the same downstream: + +```python +shared_bulkhead = Bulkhead(max_concurrent=20) + +async with AsyncClient( + base_url="https://upstream.example.com/v1", + middleware=[shared_bulkhead], +) as client_a, AsyncClient( + base_url="https://upstream.example.com/v2", + middleware=[shared_bulkhead], +) as client_b: + ... # the 20-slot cap is enforced jointly across A and B +``` + ## What's still ahead -The rest of Epic 3 — `Bulkhead` (concurrency limiter) and extension-slot documentation — ships in subsequent releases. Epic 5 (observability hooks + OTel middleware) is also unstarted; logging of retry decisions will plumb through then. +The only remaining Epic 3 work is `3-6` extension-slot documentation, which ships as a docs-only follow-up. Epic 5 (observability hooks + OTel middleware) is unstarted; logging of retry/bulkhead decisions plumbs through then. -Out of scope for this release (per the spec, may revisit on real-user pain): per-call retry override via `extensions`, a `Backoff` protocol abstraction, `retry_on_exception=` configuration, and retrying streamed request bodies (the latter waits for `AsyncClient.stream` in Epic 4). +Out of scope for this release (per the specs, may revisit on real-user pain): per-call retry override via `extensions`, a `Backoff` protocol abstraction, `retry_on_exception=` configuration, retrying streamed request bodies (the latter waits for `AsyncClient.stream` in Epic 4), per-host Bulkhead partitioning, and Bulkhead queue-depth metrics. ## References -- Spec: [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](../specs/2026-06-05-retry-and-retry-budget-design.md) -- Plan: [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](../plans/2026-06-05-retry-and-retry-budget-plan.md) +- Retry spec: [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](../specs/2026-06-05-retry-and-retry-budget-design.md) +- Retry plan: [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](../plans/2026-06-05-retry-and-retry-budget-plan.md) +- Bulkhead spec: [`planning/specs/2026-06-05-bulkhead-design.md`](../specs/2026-06-05-bulkhead-design.md) +- Bulkhead plan: [`planning/plans/2026-06-05-bulkhead-plan.md`](../plans/2026-06-05-bulkhead-plan.md) - Roadmap: [`planning/engineering.md`](../engineering.md) §8 diff --git a/planning/specs/2026-06-05-bulkhead-design.md b/planning/specs/2026-06-05-bulkhead-design.md index e1f1740..1968249 100644 --- a/planning/specs/2026-06-05-bulkhead-design.md +++ b/planning/specs/2026-06-05-bulkhead-design.md @@ -1,9 +1,9 @@ -# Spec: Bulkhead middleware (0.5.0, Epic 3 slice 2) +# Spec: Bulkhead middleware (0.4.0, Epic 3 slice 2) **Date:** 2026-06-05 **Topic slug:** `bulkhead` **Status:** drafted, awaiting user review -**Target release:** 0.5.0 (0.4.0 release notes already drafted without this slice). +**Target release:** 0.4.0 (bundled with slice 1 — Retry + RetryBudget). **Epic 3 stories rolled in:** 3-5 (Bulkhead). ## Purpose From 54b025edb060309c6ecc4ad3015d1de3b037f531 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:01:44 +0300 Subject: [PATCH 04/15] feat(errors): add BulkheadFullError(ClientError) Distinct exception raised by the Bulkhead middleware when acquire_timeout elapses without acquiring a slot. Carries max_concurrent + acquire_timeout for caller logging. Picklable via _reconstruct_bulkhead_full + __reduce__, mirroring the existing StatusError / RetryBudgetExhaustedError pattern. Inherits ClientError (not TimeoutError) because a bulkhead-full event is a backpressure signal, not a network timeout. --- src/httpware/errors.py | 29 +++++++++++++++++++++++++++++ tests/test_errors.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/httpware/errors.py b/src/httpware/errors.py index 1c8cbcc..28f37f1 100644 --- a/src/httpware/errors.py +++ b/src/httpware/errors.py @@ -183,3 +183,32 @@ def __reduce__(self) -> tuple[Any, ...]: _reconstruct_budget_exhausted, (type(self), self.last_response, self.last_exception, self.attempts), ) + + +def _reconstruct_bulkhead_full( + cls: "type[BulkheadFullError]", + max_concurrent: int, + acquire_timeout: float | None, +) -> "BulkheadFullError": + return cls(max_concurrent=max_concurrent, acquire_timeout=acquire_timeout) + + +class BulkheadFullError(ClientError): + """Raised when ``acquire_timeout`` elapses before a Bulkhead slot becomes available. + + Carries the configured caps for caller logging/alerting. + """ + + max_concurrent: int + acquire_timeout: float | None + + def __init__(self, *, max_concurrent: int, acquire_timeout: float | None) -> None: + self.max_concurrent = max_concurrent + self.acquire_timeout = acquire_timeout + super().__init__(f"bulkhead full (max_concurrent={max_concurrent}, acquire_timeout={acquire_timeout})") + + def __reduce__(self) -> tuple[Any, ...]: + return ( + _reconstruct_bulkhead_full, + (type(self), self.max_concurrent, self.acquire_timeout), + ) diff --git a/tests/test_errors.py b/tests/test_errors.py index ffccfa6..18f0efc 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -9,6 +9,7 @@ from httpware.errors import ( STATUS_TO_EXCEPTION, BadRequestError, + BulkheadFullError, ClientError, ClientStatusError, ConflictError, @@ -99,6 +100,8 @@ def test_status_error_repr_strips_userinfo() -> None: _RETRY_ATTEMPTS_3 = 3 _RETRY_ATTEMPTS_2 = 2 _RETRY_ATTEMPTS_5 = 5 +_MAX_CONCURRENT_5 = 5 +_ACQUIRE_TIMEOUT_1_0 = 1.0 def test_status_error_pickleable() -> None: @@ -208,3 +211,28 @@ def test_retry_budget_exhausted_error_pickleable() -> None: assert restored.attempts == _RETRY_ATTEMPTS_3 assert restored.last_response is not None assert restored.last_response.status_code == _SERVICE_UNAVAILABLE + + +def test_bulkhead_full_error_is_client_error() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + assert isinstance(exc, ClientError) + assert exc.max_concurrent == _MAX_CONCURRENT_5 + assert exc.acquire_timeout == _ACQUIRE_TIMEOUT_1_0 + + +def test_bulkhead_full_error_accepts_none_acquire_timeout() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=None) + assert exc.acquire_timeout is None + + +def test_bulkhead_full_error_summary_mentions_caps() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + assert str(exc) == "bulkhead full (max_concurrent=5, acquire_timeout=1.0)" + + +def test_bulkhead_full_error_pickleable() -> None: + exc = BulkheadFullError(max_concurrent=_MAX_CONCURRENT_5, acquire_timeout=_ACQUIRE_TIMEOUT_1_0) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, BulkheadFullError) + assert restored.max_concurrent == _MAX_CONCURRENT_5 + assert restored.acquire_timeout == _ACQUIRE_TIMEOUT_1_0 From f1a3763ecd470b12dbc2b1a0a8422b84f7edad7b Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:07:36 +0300 Subject: [PATCH 05/15] =?UTF-8?q?feat(resilience):=20Bulkhead=20middleware?= =?UTF-8?q?=20=E2=80=94=20happy=20path=20+=20validation=20+=20capacity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Constructor validates max_concurrent >= 1 and acquire_timeout >= 0 (None and 0 both accepted). asyncio.Semaphore enforces the cap; the explicit acquire + try/finally around next() guarantees release on every exit path. Acquire failures map to BulkheadFullError. Subsequent tasks cover fail-fast / wait-forever modes, exception + cancellation release semantics, cross-client sharing, and property tests. --- .../middleware/resilience/__init__.py | 3 +- .../middleware/resilience/bulkhead.py | 56 +++++++ tests/test_bulkhead.py | 143 ++++++++++++++++++ 3 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 tests/test_bulkhead.py diff --git a/src/httpware/middleware/resilience/__init__.py b/src/httpware/middleware/resilience/__init__.py index 61244be..e3a02ca 100644 --- a/src/httpware/middleware/resilience/__init__.py +++ b/src/httpware/middleware/resilience/__init__.py @@ -1,7 +1,8 @@ """Resilience primitives: Retry middleware and RetryBudget token bucket.""" from httpware.middleware.resilience.budget import RetryBudget +from httpware.middleware.resilience.bulkhead import Bulkhead from httpware.middleware.resilience.retry import Retry -__all__ = ["Retry", "RetryBudget"] +__all__ = ["Bulkhead", "Retry", "RetryBudget"] diff --git a/src/httpware/middleware/resilience/bulkhead.py b/src/httpware/middleware/resilience/bulkhead.py index 46fa8f3..99a7d69 100644 --- a/src/httpware/middleware/resilience/bulkhead.py +++ b/src/httpware/middleware/resilience/bulkhead.py @@ -1,4 +1,60 @@ """Bulkhead middleware — concurrency limiter via asyncio.Semaphore. See planning/specs/2026-06-05-bulkhead-design.md for the contract. + +The middleware owns an asyncio.Semaphore(max_concurrent). On each request, +it acquires a slot (bounded by acquire_timeout via asyncio.timeout) and +releases the slot in a try/finally so success, exceptions, and cancellation +all release deterministically. + +Bulkhead is the sharable unit — pass the same instance to multiple +AsyncClient(middleware=[shared]) calls to enforce a joint cap across clients. """ + +import asyncio + +import httpx2 + +from httpware.errors import BulkheadFullError +from httpware.middleware import Next + + +_MAX_CONCURRENT_INVALID = "max_concurrent must be >= 1" +_ACQUIRE_TIMEOUT_INVALID = "acquire_timeout must be >= 0" + + +class Bulkhead: + """Concurrency limiter middleware. See module docstring for behavior.""" + + def __init__( + self, + *, + max_concurrent: int, + acquire_timeout: float | None = 1.0, + ) -> None: + if max_concurrent < 1: + raise ValueError(_MAX_CONCURRENT_INVALID) + if acquire_timeout is not None and acquire_timeout < 0: + raise ValueError(_ACQUIRE_TIMEOUT_INVALID) + self._max_concurrent = max_concurrent + self._acquire_timeout = acquire_timeout + self._sem = asyncio.Semaphore(max_concurrent) + + async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 + """Acquire a slot (bounded by acquire_timeout), invoke next, release.""" + try: + if self._acquire_timeout is None: + await self._sem.acquire() + else: + async with asyncio.timeout(self._acquire_timeout): + await self._sem.acquire() + except TimeoutError as exc: + raise BulkheadFullError( + max_concurrent=self._max_concurrent, + acquire_timeout=self._acquire_timeout, + ) from exc + + try: + return await next(request) + finally: + self._sem.release() diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py new file mode 100644 index 0000000..ebff66b --- /dev/null +++ b/tests/test_bulkhead.py @@ -0,0 +1,143 @@ +"""Tests for the Bulkhead middleware. + +Mocks the transport via httpx2.MockTransport. Concurrency tests use real +asyncio coroutines with sub-100ms timeouts so the suite stays fast. +""" + +import asyncio +import contextlib +from collections.abc import Callable, Coroutine +from http import HTTPStatus +from typing import Any + +import httpx2 +import pytest + +from httpware import AsyncClient +from httpware.errors import BulkheadFullError +from httpware.middleware.resilience.bulkhead import Bulkhead + + +_MAX_CONCURRENT_1 = 1 +_MAX_CONCURRENT_2 = 2 +_ACQUIRE_TIMEOUT_FAST = 0.01 + + +class _SlowHandler: + """Mock handler that blocks for `delay` seconds before returning 200 OK.""" + + def __init__(self, delay: float) -> None: + self.delay = delay + self.in_flight = 0 + self.max_in_flight = 0 + self.calls = 0 + + async def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + self.in_flight += 1 + self.max_in_flight = max(self.max_in_flight, self.in_flight) + try: + await asyncio.sleep(self.delay) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + self.in_flight -= 1 + + +def _client( + handler: Callable[[httpx2.Request], httpx2.Response] + | Callable[[httpx2.Request], Coroutine[Any, Any, httpx2.Response]], + *, + bulkhead: Bulkhead, +) -> AsyncClient: + transport = httpx2.MockTransport(handler) + return AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[bulkhead], + ) + + +def test_max_concurrent_zero_rejected() -> None: + with pytest.raises(ValueError, match="max_concurrent must be >= 1"): + Bulkhead(max_concurrent=0) + + +def test_max_concurrent_negative_rejected() -> None: + with pytest.raises(ValueError, match="max_concurrent must be >= 1"): + Bulkhead(max_concurrent=-1) + + +def test_negative_acquire_timeout_rejected() -> None: + with pytest.raises(ValueError, match="acquire_timeout must be >= 0"): + Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=-0.1) + + +def test_acquire_timeout_zero_accepted() -> None: + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) + assert bulkhead._acquire_timeout == 0 # noqa: SLF001 + + +def test_acquire_timeout_none_accepted() -> None: + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + assert bulkhead._acquire_timeout is None # noqa: SLF001 + + +async def test_succeeds_when_slot_available() -> None: + handler = _SlowHandler(delay=0.0) + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_2)) + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 + + +async def test_serializes_at_capacity() -> None: + """With max_concurrent=1 and 3 concurrent calls, in-flight count never exceeds 1.""" + handler = _SlowHandler(delay=0.02) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None), + ) + await asyncio.gather( + client.get("https://example.test/a"), + client.get("https://example.test/b"), + client.get("https://example.test/c"), + ) + assert handler.calls == 3 # noqa: PLR2004 — three concurrent gets above + assert handler.max_in_flight == 1 # cap honored + + +async def test_max_concurrent_2_observes_at_most_2_in_flight() -> None: + handler = _SlowHandler(delay=0.02) + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_2, acquire_timeout=None)) + await asyncio.gather( + client.get("https://example.test/a"), + client.get("https://example.test/b"), + client.get("https://example.test/c"), + client.get("https://example.test/d"), + ) + assert handler.calls == 4 # noqa: PLR2004 — four concurrent gets above + assert handler.max_in_flight <= _MAX_CONCURRENT_2 + + +async def test_raises_bulkhead_full_error_when_acquire_timeout_exceeded() -> None: + """Slot is held by a slow request; a second request with a tiny timeout raises BulkheadFullError.""" + handler = _SlowHandler(delay=1.0) + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=_ACQUIRE_TIMEOUT_FAST) + client = _client(handler, bulkhead=bulkhead) + + async def _hold_slot() -> None: + await client.get("https://example.test/slow") + + task = asyncio.create_task(_hold_slot()) + # Yield to let the slow request acquire the semaphore. + await asyncio.sleep(0) + + with pytest.raises(BulkheadFullError) as exc_info: + await client.get("https://example.test/fast") + + assert exc_info.value.max_concurrent == _MAX_CONCURRENT_1 + assert exc_info.value.acquire_timeout == _ACQUIRE_TIMEOUT_FAST + + # Cancel the lingering slow task to avoid polluting the event loop. + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task From dd24513c21792f5ec4152fdf8b580b5b9449dbb5 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:13:00 +0300 Subject: [PATCH 06/15] docs(resilience): refresh subpackage docstring to mention Bulkhead Code reviewer noted the docstring was stale after Bulkhead landed. One-line update; no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/httpware/middleware/resilience/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/httpware/middleware/resilience/__init__.py b/src/httpware/middleware/resilience/__init__.py index e3a02ca..08d2e5f 100644 --- a/src/httpware/middleware/resilience/__init__.py +++ b/src/httpware/middleware/resilience/__init__.py @@ -1,4 +1,4 @@ -"""Resilience primitives: Retry middleware and RetryBudget token bucket.""" +"""Resilience primitives: Bulkhead, Retry middleware, and RetryBudget token bucket.""" from httpware.middleware.resilience.budget import RetryBudget from httpware.middleware.resilience.bulkhead import Bulkhead From 58026e432ec3b29f3d6de7d9af668534e5648ea8 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:14:31 +0300 Subject: [PATCH 07/15] =?UTF-8?q?test(resilience):=20Bulkhead=20acquire=5F?= =?UTF-8?q?timeout=20modes=20=E2=80=94=20bounded=20/=20fail-fast=20/=20for?= =?UTF-8?q?ever?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pins the three acquire_timeout modes: bounded wait raises BulkheadFullError after the configured timeout, =0 fails fast without waiting, =None waits until a slot frees. --- tests/test_bulkhead.py | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py index ebff66b..36e8482 100644 --- a/tests/test_bulkhead.py +++ b/tests/test_bulkhead.py @@ -21,6 +21,8 @@ _MAX_CONCURRENT_1 = 1 _MAX_CONCURRENT_2 = 2 _ACQUIRE_TIMEOUT_FAST = 0.01 +_ACQUIRE_TIMEOUT_SHORT = 0.02 +_ACQUIRE_TIMEOUT_LONG = 0.1 class _SlowHandler: @@ -141,3 +143,57 @@ async def _hold_slot() -> None: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task + + +async def test_bounded_wait_raises_bulkhead_full_error() -> None: + """With max_concurrent=1 and acquire_timeout=0.02, the second call raises after ~20ms. + + Complements test_raises_bulkhead_full_error_when_acquire_timeout_exceeded + (from Task 3, coverage smoke); this test additionally asserts the + BulkheadFullError fields (max_concurrent / acquire_timeout) carry the + configured values. + """ + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_LONG) # holds slot for 100ms + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=_ACQUIRE_TIMEOUT_SHORT), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) # let first acquire the slot + with pytest.raises(BulkheadFullError) as info: + await client.get("https://example.test/b") + assert info.value.max_concurrent == _MAX_CONCURRENT_1 + assert info.value.acquire_timeout == _ACQUIRE_TIMEOUT_SHORT + await first # cleanup + + +async def test_acquire_timeout_zero_fails_fast() -> None: + """With acquire_timeout=0, the second call raises immediately without waiting.""" + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_LONG) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) + with pytest.raises(BulkheadFullError) as info: + await client.get("https://example.test/b") + assert info.value.acquire_timeout == 0 + await first + + +async def test_acquire_timeout_none_waits_forever() -> None: + """With acquire_timeout=None, the second call waits until the first releases.""" + handler = _SlowHandler(delay=_ACQUIRE_TIMEOUT_SHORT) + client = _client( + handler, + bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None), + ) + + first = asyncio.create_task(client.get("https://example.test/a")) + second = asyncio.create_task(client.get("https://example.test/b")) + responses = await asyncio.wait_for(asyncio.gather(first, second), timeout=1.0) + assert all(r.status_code == HTTPStatus.OK for r in responses) + assert handler.calls == 2 # noqa: PLR2004 — both eventually succeeded From 8530f7af4a9335b7c5a2014515b22880d679f598 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:17:11 +0300 Subject: [PATCH 08/15] test(resilience): Bulkhead release semantics on exception + cancellation Pins the try/finally release: exception in next() releases the slot, cancellation during next() releases the slot, cancellation while parked on acquire() does not hold a slot. --- tests/test_bulkhead.py | 61 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py index 36e8482..f6d3ee5 100644 --- a/tests/test_bulkhead.py +++ b/tests/test_bulkhead.py @@ -197,3 +197,64 @@ async def test_acquire_timeout_none_waits_forever() -> None: responses = await asyncio.wait_for(asyncio.gather(first, second), timeout=1.0) assert all(r.status_code == HTTPStatus.OK for r in responses) assert handler.calls == 2 # noqa: PLR2004 — both eventually succeeded + + +async def test_slot_released_after_exception_in_next() -> None: + """If next() raises, the slot is released — subsequent calls succeed immediately.""" + call_count = {"n": 0} + + def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + if call_count["n"] == 1: + msg = "boom" + raise RuntimeError(msg) + return httpx2.Response(HTTPStatus.OK, request=request) + + client = _client(handler, bulkhead=Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0)) + + # First call raises; slot must release. + with pytest.raises(RuntimeError, match="boom"): + await client.get("https://example.test/a") + + # Second call must succeed immediately — fail-fast=0 proves the slot is free. + response = await client.get("https://example.test/b") + assert response.status_code == HTTPStatus.OK + assert call_count["n"] == 2 # noqa: PLR2004 — second call reached handler + + +async def test_slot_released_on_cancellation() -> None: + """If the calling task is cancelled while next() runs, the slot is released.""" + handler = _SlowHandler(delay=0.5) # would block indefinitely + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) + client = _client(handler, bulkhead=bulkhead) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.01) # let first acquire and start sleeping in handler + first.cancel() + with pytest.raises(asyncio.CancelledError): + await first + + # Slot must now be released — fail-fast=0 next call proves it. + handler.delay = 0.0 # speed up the next request + response = await client.get("https://example.test/b") + assert response.status_code == HTTPStatus.OK + + +async def test_cancellation_before_acquire_does_not_hold_slot() -> None: + """Cancellation while waiting for a slot must not leak the slot to the cancelled task.""" + handler = _SlowHandler(delay=0.05) + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + client = _client(handler, bulkhead=bulkhead) + + first = asyncio.create_task(client.get("https://example.test/a")) + await asyncio.sleep(0.005) # first acquires + second = asyncio.create_task(client.get("https://example.test/b")) # waits for slot + await asyncio.sleep(0.005) # ensure second is parked on acquire + second.cancel() + with pytest.raises(asyncio.CancelledError): + await second + + # First should still complete normally. + response = await first + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 # second never reached the handler From 2e5e4d745bcb60de1b97e2b114cd679450e2bf45 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:19:54 +0300 Subject: [PATCH 09/15] test(resilience): Bulkhead sharing across clients + construct-outside-loop Pins two behaviors: a Bulkhead instantiated at module scope (outside any event loop) works correctly when used inside one, and a single Bulkhead instance passed to multiple AsyncClient instances enforces the joint cap across all of them. --- tests/test_bulkhead.py | 48 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py index f6d3ee5..5df3af8 100644 --- a/tests/test_bulkhead.py +++ b/tests/test_bulkhead.py @@ -258,3 +258,51 @@ async def test_cancellation_before_acquire_does_not_hold_slot() -> None: response = await first assert response.status_code == HTTPStatus.OK assert handler.calls == 1 # second never reached the handler + + +# Constructed at module scope on purpose — pins the construct-outside-loop behavior. +_MODULE_SCOPE_BULKHEAD = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + + +async def test_construct_outside_event_loop_then_use_inside() -> None: + """Bulkhead constructed at module scope must work when used inside an event loop.""" + handler = _SlowHandler(delay=0.0) + client = _client(handler, bulkhead=_MODULE_SCOPE_BULKHEAD) + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + + +async def test_shared_bulkhead_enforces_joint_cap() -> None: + """One Bulkhead shared across two AsyncClients enforces the joint cap.""" + # Both clients use ONE handler that tracks combined in-flight across all calls. + # asyncio is single-threaded so a plain dict counter is safe between awaits. + state = {"in_flight": 0, "max_in_flight": 0} + + async def shared_handler(request: httpx2.Request) -> httpx2.Response: + state["in_flight"] += 1 + state["max_in_flight"] = max(state["max_in_flight"], state["in_flight"]) + try: + await asyncio.sleep(0.02) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + state["in_flight"] -= 1 + + shared = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) + client_a = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(shared_handler)), + middleware=[shared], + ) + client_b = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(shared_handler)), + middleware=[shared], + ) + + await asyncio.gather( + client_a.get("https://upstream-a.example.test/x"), + client_a.get("https://upstream-a.example.test/y"), + client_b.get("https://upstream-b.example.test/x"), + client_b.get("https://upstream-b.example.test/y"), + ) + + # The shared bulkhead enforces max=1 across BOTH clients combined. + assert state["max_in_flight"] <= _MAX_CONCURRENT_1 From f7f8f99627110e7ee7bffee75164bac1fb4705bd Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:22:39 +0300 Subject: [PATCH 10/15] test(resilience): Hypothesis property tests for Bulkhead Three invariants: in-flight never exceeds max_concurrent under any interleaving; fail-fast (acquire_timeout=0) raises BulkheadFullError when at capacity; after all calls drain, the bulkhead has full capacity available again (no slot leak). Co-Authored-By: Claude Sonnet 4.6 --- tests/test_bulkhead_props.py | 113 +++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 tests/test_bulkhead_props.py diff --git a/tests/test_bulkhead_props.py b/tests/test_bulkhead_props.py new file mode 100644 index 0000000..3ea36b1 --- /dev/null +++ b/tests/test_bulkhead_props.py @@ -0,0 +1,113 @@ +"""Hypothesis property tests for Bulkhead. + +Properties verified: +1. Observed in-flight count never exceeds max_concurrent under any interleaving. +2. With acquire_timeout=0 and a full bulkhead, the call raises BulkheadFullError. +3. Successful acquisitions are released — back-to-back calls eventually drain + without leaking slots. +""" + +import asyncio +from http import HTTPStatus + +import httpx2 +import pytest +from hypothesis import given, settings +from hypothesis import strategies as st + +from httpware import AsyncClient +from httpware.errors import BulkheadFullError +from httpware.middleware.resilience.bulkhead import Bulkhead + + +class _InFlightHandler: + """Tracks max simultaneous in-flight count across all calls.""" + + def __init__(self, delay: float) -> None: + self.delay = delay + self.in_flight = 0 + self.max_in_flight = 0 + self.calls = 0 + + async def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + self.in_flight += 1 + self.max_in_flight = max(self.max_in_flight, self.in_flight) + try: + await asyncio.sleep(self.delay) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + self.in_flight -= 1 + + +@given( + max_concurrent=st.integers(min_value=1, max_value=8), + n_requests=st.integers(min_value=1, max_value=32), + delay=st.floats(min_value=0.001, max_value=0.005), +) +@settings(max_examples=30, deadline=None) +async def test_in_flight_never_exceeds_max_concurrent( + max_concurrent: int, + n_requests: int, + delay: float, +) -> None: + handler = _InFlightHandler(delay=delay) + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None)], + ) + await asyncio.gather(*(client.get(f"https://example.test/{i}") for i in range(n_requests))) + assert handler.calls == n_requests + assert handler.max_in_flight <= max_concurrent + + +@given( + max_concurrent=st.integers(min_value=1, max_value=4), + extra_requests=st.integers(min_value=1, max_value=8), +) +@settings(max_examples=20, deadline=None) +async def test_fail_fast_rejects_when_at_capacity( + max_concurrent: int, + extra_requests: int, +) -> None: + handler = _InFlightHandler(delay=0.05) # hold slots long enough for fail-fast to fire + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=0)], + ) + + # Fill the bulkhead with max_concurrent long-running tasks. + holders = [asyncio.create_task(client.get(f"https://example.test/hold-{i}")) for i in range(max_concurrent)] + await asyncio.sleep(0.005) # let the holders acquire their slots + + # Any extra requests should fail fast with BulkheadFullError. + for i in range(extra_requests): + with pytest.raises(BulkheadFullError): + await client.get(f"https://example.test/extra-{i}") + + # Cleanup the holders. + await asyncio.gather(*holders) + + +@given( + max_concurrent=st.integers(min_value=1, max_value=4), + n_requests=st.integers(min_value=4, max_value=16), +) +@settings(max_examples=20, deadline=None) +async def test_no_slot_leak_after_drain(max_concurrent: int, n_requests: int) -> None: + """After all calls complete, the bulkhead has its full capacity available.""" + handler = _InFlightHandler(delay=0.001) + bulkhead = Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None) + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[bulkhead], + ) + + await asyncio.gather(*(client.get(f"https://example.test/{i}") for i in range(n_requests))) + + # Bulkhead should be drained — _value equals max_concurrent again. + # asyncio.Semaphore._value is implementation detail but reliable across CPython 3.11+. + assert bulkhead._sem._value == max_concurrent # noqa: SLF001 From b42d594ecb38ef05830671ee67c3ef8c12703ee7 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:25:53 +0300 Subject: [PATCH 11/15] feat(api): export Bulkhead and BulkheadFullError Completes the v0.4 slice 2: Bulkhead concurrency limiter middleware + its backpressure exception. Pure-stdlib core, no new optional extra. --- src/httpware/__init__.py | 5 ++++- tests/test_public_api.py | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/httpware/__init__.py b/src/httpware/__init__.py index b80c90d..13a6862 100644 --- a/src/httpware/__init__.py +++ b/src/httpware/__init__.py @@ -5,6 +5,7 @@ from httpware.errors import ( STATUS_TO_EXCEPTION, BadRequestError, + BulkheadFullError, ClientError, ClientStatusError, ConflictError, @@ -23,13 +24,15 @@ UnprocessableEntityError, ) from httpware.middleware import Middleware, Next, after_response, before_request, on_error -from httpware.middleware.resilience import Retry, RetryBudget +from httpware.middleware.resilience import Bulkhead, Retry, RetryBudget __all__ = [ "STATUS_TO_EXCEPTION", "AsyncClient", "BadRequestError", + "Bulkhead", + "BulkheadFullError", "ClientError", "ClientStatusError", "ConflictError", diff --git a/tests/test_public_api.py b/tests/test_public_api.py index 92483e7..f413ac6 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -29,6 +29,8 @@ def test_no_removed_symbols_leaked() -> None: def test_expected_exports() -> None: expected = { "AsyncClient", + "Bulkhead", + "BulkheadFullError", "Middleware", "NetworkError", "Next", From cbf0724005537ced3828f0831f4a03f5ebc26744 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:26:00 +0300 Subject: [PATCH 12/15] docs(planning): mark 3-5 Bulkhead shipped in v0.4 slice 2 --- planning/engineering.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/planning/engineering.md b/planning/engineering.md index 1a06e56..7bd1f59 100644 --- a/planning/engineering.md +++ b/planning/engineering.md @@ -123,7 +123,8 @@ Post-pivot, the roadmap has three categories. Topic slugs in `planning/specs/` a - **Epic 3 — Resilience:** - **Shipped in v0.4 slice 1:** `Retry` middleware + Finagle-style `RetryBudget` token bucket + `attempt_timeout=` parameter (folded-in 3-1). See [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](specs/2026-06-05-retry-and-retry-budget-design.md) and [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](plans/2026-06-05-retry-and-retry-budget-plan.md). - - **Remaining:** `3-5` `Bulkhead`, `3-6` extension-slot docs. + - **Shipped in v0.4 slice 2:** `Bulkhead` middleware (concurrency limiter via `asyncio.Semaphore` with bounded acquire wait). See [`planning/specs/2026-06-05-bulkhead-design.md`](specs/2026-06-05-bulkhead-design.md) and [`planning/plans/2026-06-05-bulkhead-plan.md`](plans/2026-06-05-bulkhead-plan.md). + - **Remaining:** `3-6` extension-slot docs. - **Epic 4 — Streaming:** `4-3` `AsyncClient.stream` context manager (forwards to `httpx2.AsyncClient.stream`; no `StreamResponse` type). - **Epic 5 — Observability:** `5-1` Layer 1 middleware hooks, `5-2` wire into resilience middlewares, `5-4` OpenTelemetry middleware (`otel` extra), `5-5` logging policy CI grep. - **Epic 6 — Ship v1.0:** `6-2` docs site (`mkdocs`), `6-3` benchmarks, `6-5` release flow (Trusted Publishers + Sigstore). From b93b0e62aadfd59c0d72f4d13f67d7f4e729fa34 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:30:35 +0300 Subject: [PATCH 13/15] docs(planning): align Bulkhead spec with implementation (TimeoutError, not asyncio.TimeoutError) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In Python 3.11+, asyncio.TimeoutError IS builtins.TimeoutError. The implementation uses the bare name; align the spec snippet so spec and code agree. Cosmetic — no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/specs/2026-06-05-bulkhead-design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planning/specs/2026-06-05-bulkhead-design.md b/planning/specs/2026-06-05-bulkhead-design.md index 1968249..2bddff5 100644 --- a/planning/specs/2026-06-05-bulkhead-design.md +++ b/planning/specs/2026-06-05-bulkhead-design.md @@ -109,7 +109,7 @@ async def __call__(self, request, next): else: async with asyncio.timeout(self._acquire_timeout): await self._sem.acquire() - except asyncio.TimeoutError as exc: + except TimeoutError as exc: # builtins.TimeoutError, which `asyncio.timeout` raises in 3.11+ raise BulkheadFullError( max_concurrent=self._max_concurrent, acquire_timeout=self._acquire_timeout, From 25db15b3b34e75f7d873c1c1670c4a99125a93a4 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:44:36 +0300 Subject: [PATCH 14/15] =?UTF-8?q?test(resilience):=20address=20review=20fe?= =?UTF-8?q?edback=20=E2=80=94=20leak-proof=20+=20composition=20tests=20+?= =?UTF-8?q?=20docstring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bulkhead class docstring: replace one-liner with Parameters section so IDE hovers carry the required-vs-default + range info. - test_cancellation_before_acquire_does_not_hold_slot: issue a third request WHILE first still holds the slot. Pins that the cancelled second did not leave a phantom free slot behind. - Add Bulkhead+Retry composition tests (test_bulkhead.py): - test_bulkhead_outside_retry_holds_one_slot_across_attempts — pins the documented [Bulkhead, Retry] ordering: one slot covers the whole retry sequence (max_in_flight stays at 1 even though handler is called twice). - test_bulkhead_full_error_is_not_retried_by_retry — pins that BulkheadFullError (ClientError) is NOT in Retry's catch set; injected _sleep with # pragma: no cover documents the "must never run" assertion. Skipped one review item (#3): the BulkheadFullError summary string includes "acquire_timeout=None" when constructed manually with None. This branch is unreachable by design — the exception is never raised when acquire_timeout=None. Cosmetic at best; not worth special-casing. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../middleware/resilience/bulkhead.py | 17 ++- tests/test_bulkhead.py | 101 +++++++++++++++++- 2 files changed, 112 insertions(+), 6 deletions(-) diff --git a/src/httpware/middleware/resilience/bulkhead.py b/src/httpware/middleware/resilience/bulkhead.py index 99a7d69..bfcc608 100644 --- a/src/httpware/middleware/resilience/bulkhead.py +++ b/src/httpware/middleware/resilience/bulkhead.py @@ -24,7 +24,22 @@ class Bulkhead: - """Concurrency limiter middleware. See module docstring for behavior.""" + """Concurrency limiter middleware backed by ``asyncio.Semaphore``. + + Parameters + ---------- + max_concurrent + Required. Maximum number of in-flight requests this Bulkhead permits. + Must be ``>= 1``. There is no default because no value is universally + correct — the right cap depends on downstream capacity and SLA. + acquire_timeout + Seconds to wait for a slot before raising ``BulkheadFullError``. + Defaults to ``1.0``. ``None`` waits forever; ``0`` fails fast. Must be + ``>= 0`` (or ``None``). + + See the module docstring for the algorithm and middleware-ordering guidance. + + """ def __init__( self, diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py index 5df3af8..01c19a1 100644 --- a/tests/test_bulkhead.py +++ b/tests/test_bulkhead.py @@ -16,6 +16,7 @@ from httpware import AsyncClient from httpware.errors import BulkheadFullError from httpware.middleware.resilience.bulkhead import Bulkhead +from httpware.middleware.resilience.retry import Retry _MAX_CONCURRENT_1 = 1 @@ -241,7 +242,13 @@ async def test_slot_released_on_cancellation() -> None: async def test_cancellation_before_acquire_does_not_hold_slot() -> None: - """Cancellation while waiting for a slot must not leak the slot to the cancelled task.""" + """Cancellation while waiting for a slot must not leak the slot to the cancelled task. + + Stronger check than just "first completes": after the cancelled task is buried, + a fresh request issued WHILE first still holds the slot must wait for first to + release (it must NOT take the slot the cancelled task was waiting for). And once + first releases, the fresh request must complete normally. + """ handler = _SlowHandler(delay=0.05) bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None) client = _client(handler, bulkhead=bulkhead) @@ -254,10 +261,13 @@ async def test_cancellation_before_acquire_does_not_hold_slot() -> None: with pytest.raises(asyncio.CancelledError): await second - # First should still complete normally. - response = await first - assert response.status_code == HTTPStatus.OK - assert handler.calls == 1 # second never reached the handler + # Third request issued while first still holds the slot — must not see a phantom + # free slot left by the cancelled second. + third = asyncio.create_task(client.get("https://example.test/c")) + first_response, third_response = await asyncio.gather(first, third) + assert first_response.status_code == HTTPStatus.OK + assert third_response.status_code == HTTPStatus.OK + assert handler.calls == 2 # noqa: PLR2004 — first and third reached handler; second never did # Constructed at module scope on purpose — pins the construct-outside-loop behavior. @@ -306,3 +316,84 @@ async def shared_handler(request: httpx2.Request) -> httpx2.Response: # The shared bulkhead enforces max=1 across BOTH clients combined. assert state["max_in_flight"] <= _MAX_CONCURRENT_1 + + +# ---------------------------------------------------------------------------- +# Bulkhead + Retry composition tests +# +# The recommended ordering is [Bulkhead, Retry] in middleware= — Bulkhead OUTSIDE +# Retry so a retrying request holds one slot across all attempts (rather than +# re-acquiring per retry). These tests pin the documented composition. +# ---------------------------------------------------------------------------- + + +async def test_bulkhead_outside_retry_holds_one_slot_across_attempts() -> None: + """[Bulkhead, Retry]: one slot covers the whole retry sequence, not per-attempt.""" + state = {"in_flight": 0, "max_in_flight": 0} + call_count = {"n": 0} + + async def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + state["in_flight"] += 1 + state["max_in_flight"] = max(state["max_in_flight"], state["in_flight"]) + try: + # First call returns 503 (retryable); second call returns OK. + if call_count["n"] == 1: + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + return httpx2.Response(HTTPStatus.OK, request=request) + finally: + state["in_flight"] -= 1 + + transport = httpx2.MockTransport(handler) + + async def _sleep(_: float) -> None: # don't actually wait between retries + return + + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[ + Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=None), + Retry(_sleep=_sleep, base_delay=0.001, max_delay=0.002), + ], + ) + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert call_count["n"] == 2 # noqa: PLR2004 — first 503 + retry success + # max_in_flight stays at 1: the same Bulkhead slot covers both attempts. + assert state["max_in_flight"] == 1 + + +async def test_bulkhead_full_error_is_not_retried_by_retry() -> None: + """Retry does NOT retry BulkheadFullError — it's neither a StatusError nor a NetworkError/TimeoutError.""" + handler = _SlowHandler(delay=0.5) # holds the slot indefinitely + bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) + transport = httpx2.MockTransport(handler) + + sleep_calls: list[float] = [] + + async def _sleep( + delay: float, + ) -> None: # pragma: no cover — assert is `sleep_calls == []`, so this body must never run + sleep_calls.append(delay) + + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[ + bulkhead, + Retry(_sleep=_sleep, max_attempts=3, base_delay=0.001, max_delay=0.002), + ], + ) + + # Fill the slot with a long-lived task. + first = asyncio.create_task(client.get("https://example.test/holder")) + await asyncio.sleep(0.01) + + # Second call hits a full Bulkhead. Retry must NOT swallow + retry it. + with pytest.raises(BulkheadFullError): + await client.get("https://example.test/rejected") + assert sleep_calls == [] # Retry never slept — it didn't try to retry + + # Cleanup. + first.cancel() + with contextlib.suppress(asyncio.CancelledError): + await first From 1af3889d43e3da4aa1cdce471cbd9ac62cd38c72 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 14:46:33 +0300 Subject: [PATCH 15/15] test(resilience): use AsyncMock instead of pragma'd stub for never-called sleep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit User feedback: # pragma: no cover on a user-defined function body that intentionally never runs is the wrong shape — refactor to use a mock whose body is structural (unittest.mock internals are excluded from coverage measurement). AsyncMock().assert_not_called() expresses the "must never run" assertion cleanly with no coverage gymnastics. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_bulkhead.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/test_bulkhead.py b/tests/test_bulkhead.py index 01c19a1..9e70c4f 100644 --- a/tests/test_bulkhead.py +++ b/tests/test_bulkhead.py @@ -9,6 +9,7 @@ from collections.abc import Callable, Coroutine from http import HTTPStatus from typing import Any +from unittest.mock import AsyncMock import httpx2 import pytest @@ -369,18 +370,15 @@ async def test_bulkhead_full_error_is_not_retried_by_retry() -> None: bulkhead = Bulkhead(max_concurrent=_MAX_CONCURRENT_1, acquire_timeout=0) transport = httpx2.MockTransport(handler) - sleep_calls: list[float] = [] - - async def _sleep( - delay: float, - ) -> None: # pragma: no cover — assert is `sleep_calls == []`, so this body must never run - sleep_calls.append(delay) + # AsyncMock so the never-called assertion is structural — no user-defined + # body that would need # pragma: no cover. + mock_sleep = AsyncMock() client = AsyncClient( httpx2_client=httpx2.AsyncClient(transport=transport), middleware=[ bulkhead, - Retry(_sleep=_sleep, max_attempts=3, base_delay=0.001, max_delay=0.002), + Retry(_sleep=mock_sleep, max_attempts=3, base_delay=0.001, max_delay=0.002), ], ) @@ -391,7 +389,7 @@ async def _sleep( # Second call hits a full Bulkhead. Retry must NOT swallow + retry it. with pytest.raises(BulkheadFullError): await client.get("https://example.test/rejected") - assert sleep_calls == [] # Retry never slept — it didn't try to retry + mock_sleep.assert_not_called() # Retry never slept — it didn't try to retry # Cleanup. first.cancel()