Skip to content
Merged
810 changes: 810 additions & 0 deletions planning/plans/2026-06-08-test-mop-up-plan.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions planning/releases/0.8.6.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# httpware 0.8.6 — test mop-up

**Patch release. Test-only changes. No API change, no production code change, no behavior change for users.** Closes 5 audit findings — all in the test suite.

## What changed

- **`test_no_slot_leak_after_drain` is behavioral, not internals-peek.** The Hypothesis property test for `AsyncBulkhead` no longer asserts against `bulkhead._sem._value`; it submits `max_concurrent` fresh acquires after drain and confirms they all succeed under a tight `acquire_timeout`.
- **`test_threading_with_shared_budget` asserts the exact deposit count.** The previous `len(budget._deposits) > 0` was a smoke check that would have passed under deque corruption with even one survivor. The new assertion locks the count to `(N_SYNC_THREADS * N_OPS_PER_THREAD) + N_ASYNC_TASKS` = 220, made deterministic by the 0.8.3 deposit-hoist.
- **`test_optional_extras_pydantic_missing.py` covers the sync `Client` escape hatch.** The async test pinned `AsyncClient(decoder=fake)` bypassing the pydantic fail-fast; the sync `Client` had no peer. Now it does. `_FakeDecoder` hoisted to module top to keep the two tests DRY.
- **Sync `Bulkhead` has Hypothesis property tests.** New file `tests/test_bulkhead_sync_props.py` mirrors `tests/test_bulkhead_props.py` using `threading.Thread` + `ThreadPoolExecutor` instead of `asyncio.gather`. Three properties: in-flight never exceeds cap; fail-fast rejects at capacity; no slot leak after drain.
- **Sync `on_error` `BaseException` propagation is tested.** Two new tests in `test_middleware_sync.py` pin the invariant that the sync `on_error` decorator's `except Exception` clause does NOT catch `KeyboardInterrupt` or `SystemExit` — both must propagate through `compose`.

## Audit status

35 audit findings, all addressed across 0.8.1 → 0.8.6 plus the in-place doc-staleness sweep on `main`. One chunk-3 hand-review item was excluded as INVALID (the audit looked for `AsyncClient` construction tests in `tests/test_client_methods.py` — they actually live in `tests/test_client_construction.py` + `tests/test_client_lifecycle.py`).

## Upgrade

```bash
uv add httpware==0.8.6
# or
pip install -U 'httpware==0.8.6'
```

No import changes. No API changes. Nothing observable from the consumer side.
396 changes: 396 additions & 0 deletions planning/specs/2026-06-08-test-mop-up-design.md

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions tests/test_bulkhead_props.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ async def test_no_slot_leak_after_drain(max_concurrent: int, n_requests: int) ->

await asyncio.gather(*(client.get(f"https://example.test/{i}") for i in range(n_requests)))

# AsyncBulkhead should be drained — _value equals max_concurrent again.
# asyncio.Semaphore._value is implementation detail but reliable across CPython 3.11+.
assert bulkhead._sem._value == max_concurrent # noqa: SLF001
# Behavioral drain check: after gather completes, max_concurrent fresh acquires
# must succeed simultaneously under a tight acquire_timeout. If any slot leaked,
# the post-drain acquires would block past the timeout and BulkheadFullError
# would surface — a deterministic failure rather than an internals-peek.
bulkhead._acquire_timeout = 0.05 # noqa: SLF001 — test-local override of internal config
await asyncio.gather(*(client.get(f"https://example.test/post-drain-{i}") for i in range(max_concurrent)))
136 changes: 136 additions & 0 deletions tests/test_bulkhead_sync_props.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Hypothesis property tests for sync Bulkhead.

Mirrors tests/test_bulkhead_props.py for sync/async parity. Uses
threading.Thread + a shared lock-guarded counter instead of asyncio.gather.

Properties verified:
1. Observed in-flight count never exceeds max_concurrent under any interleaving.
2. With acquire_timeout=0 and a full bulkhead, the call raises BulkheadFullError.
3. Successful acquisitions are released — after drain, max_concurrent fresh
acquires succeed (behavioral, no internal-state peek).
"""

import threading
import time
from concurrent.futures import ThreadPoolExecutor
from http import HTTPStatus

import httpx2
import pytest
from hypothesis import given, settings
from hypothesis import strategies as st

from httpware import Client
from httpware.errors import BulkheadFullError
from httpware.middleware.resilience.bulkhead import Bulkhead


class _InFlightHandler:
"""Tracks max simultaneous in-flight count under a threading.Lock."""

def __init__(self, delay: float) -> None:
self.delay = delay
self._lock = threading.Lock()
self.in_flight = 0
self.max_in_flight = 0
self.calls = 0

def __call__(self, request: httpx2.Request) -> httpx2.Response:
with self._lock:
self.calls += 1
self.in_flight += 1
self.max_in_flight = max(self.max_in_flight, self.in_flight)
try:
time.sleep(self.delay)
return httpx2.Response(HTTPStatus.OK, request=request)
finally:
with self._lock:
self.in_flight -= 1


@given(
max_concurrent=st.integers(min_value=1, max_value=8),
n_requests=st.integers(min_value=1, max_value=32),
delay=st.floats(min_value=0.001, max_value=0.005),
)
@settings(max_examples=20, deadline=None)
def test_in_flight_never_exceeds_max_concurrent(
max_concurrent: int,
n_requests: int,
delay: float,
) -> None:
handler = _InFlightHandler(delay=delay)
transport = httpx2.MockTransport(handler)
client = Client(
httpx2_client=httpx2.Client(transport=transport),
middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None)],
)
with ThreadPoolExecutor(max_workers=n_requests) as pool:
futures = [pool.submit(client.get, f"https://example.test/{i}") for i in range(n_requests)]
for f in futures:
f.result()
assert handler.calls == n_requests
assert handler.max_in_flight <= max_concurrent


@given(
max_concurrent=st.integers(min_value=1, max_value=4),
extra_requests=st.integers(min_value=1, max_value=8),
)
@settings(max_examples=15, deadline=None)
def test_fail_fast_rejects_when_at_capacity(
max_concurrent: int,
extra_requests: int,
) -> None:
handler = _InFlightHandler(delay=0.05) # hold slots long enough for fail-fast to fire
transport = httpx2.MockTransport(handler)
client = Client(
httpx2_client=httpx2.Client(transport=transport),
middleware=[Bulkhead(max_concurrent=max_concurrent, acquire_timeout=0)],
)

# Fill the bulkhead with max_concurrent long-running threads.
pool = ThreadPoolExecutor(max_workers=max_concurrent + extra_requests)
holders = [pool.submit(client.get, f"https://example.test/hold-{i}") for i in range(max_concurrent)]
# Wait for the holders to acquire — sleep long enough for thread startup.
time.sleep(0.005)

# Any extra requests should fail fast with BulkheadFullError.
for i in range(extra_requests):
with pytest.raises(BulkheadFullError):
client.get(f"https://example.test/extra-{i}")

# Cleanup the holders.
for f in holders:
f.result()
pool.shutdown()


@given(
max_concurrent=st.integers(min_value=1, max_value=4),
n_requests=st.integers(min_value=4, max_value=16),
)
@settings(max_examples=15, deadline=None)
def test_no_slot_leak_after_drain(max_concurrent: int, n_requests: int) -> None:
"""After all threads complete, the bulkhead has its full capacity available."""
handler = _InFlightHandler(delay=0.001)
bulkhead = Bulkhead(max_concurrent=max_concurrent, acquire_timeout=None)
transport = httpx2.MockTransport(handler)
client = Client(
httpx2_client=httpx2.Client(transport=transport),
middleware=[bulkhead],
)

with ThreadPoolExecutor(max_workers=n_requests) as pool:
futures = [pool.submit(client.get, f"https://example.test/{i}") for i in range(n_requests)]
for f in futures:
f.result()

# Behavioral drain check: after the threads finish, max_concurrent fresh
# acquires must succeed simultaneously under a tight acquire_timeout. If
# any slot leaked, the post-drain acquires would block past the timeout.
bulkhead._acquire_timeout = 0.05 # noqa: SLF001 — test-local override
with ThreadPoolExecutor(max_workers=max_concurrent) as pool:
post = [pool.submit(client.get, f"https://example.test/post-drain-{i}") for i in range(max_concurrent)]
for f in post:
f.result()
43 changes: 43 additions & 0 deletions tests/test_middleware_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,46 @@ def my_handler(request: httpx2.Request, exc: Exception) -> httpx2.Response | Non

assert "on_error" in repr(my_handler)
assert "my_handler" in repr(my_handler)


def test_on_error_lets_keyboardinterrupt_propagate() -> None:
"""on_error catches Exception, NOT BaseException — KeyboardInterrupt must escape.

Sync mirror of test_middleware.py::test_on_error_lets_cancelled_propagate. The
async test pins asyncio.CancelledError (a BaseException). The sync world's
equivalents are KeyboardInterrupt and SystemExit; they too must propagate.
"""

@on_error
def swallow_all(
request: httpx2.Request, # noqa: ARG001
exc: Exception, # noqa: ARG001
) -> httpx2.Response | None: # pragma: no cover
msg = "should not catch BaseException"
raise AssertionError(msg)

def terminal_ki(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001
raise KeyboardInterrupt

dispatch = compose((swallow_all,), terminal_ki)
with pytest.raises(KeyboardInterrupt):
dispatch(_make_request())


def test_on_error_lets_systemexit_propagate() -> None:
"""SystemExit (sibling of KeyboardInterrupt) must also escape on_error."""

@on_error
def swallow_all(
request: httpx2.Request, # noqa: ARG001
exc: Exception, # noqa: ARG001
) -> httpx2.Response | None: # pragma: no cover
msg = "should not catch BaseException"
raise AssertionError(msg)

def terminal_se(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001
raise SystemExit

dispatch = compose((swallow_all,), terminal_se)
with pytest.raises(SystemExit):
dispatch(_make_request())
23 changes: 17 additions & 6 deletions tests/test_optional_extras_pydantic_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from httpware.decoders.pydantic import PydanticDecoder


class _FakeDecoder:
"""Test stand-in for ResponseDecoder; never called at runtime."""

def decode(self, content: bytes, model: type) -> object: # noqa: ARG002 — name pinned by ResponseDecoder protocol
return model() # pragma: no cover


def test_pydantic_decoder_init_raises_when_pydantic_missing() -> None:
with (
patch("httpware._internal.import_checker.is_pydantic_installed", False),
Expand All @@ -39,12 +46,16 @@ def test_sync_client_default_decoder_raises_when_pydantic_missing() -> None:


def test_async_client_accepts_explicit_decoder_without_pydantic() -> None:
"""An explicit decoder= escapes the fail-fast even when pydantic is 'missing'."""
"""An explicit decoder= escapes the fail-fast AND is actually wired to the client."""
fake = _FakeDecoder()
with patch("httpware._internal.import_checker.is_pydantic_installed", False):
client = AsyncClient(decoder=fake)
assert client._decoder is fake # noqa: SLF001 — wired the explicit decoder, not a default

class _FakeDecoder:
def decode(self, content: bytes, model: type) -> object: # noqa: ARG002 — name pinned by ResponseDecoder protocol
return model() # pragma: no cover

def test_sync_client_accepts_explicit_decoder_without_pydantic() -> None:
"""Sync mirror: explicit decoder= escapes the fail-fast AND is wired for sync Client too."""
fake = _FakeDecoder()
with patch("httpware._internal.import_checker.is_pydantic_installed", False):
client = AsyncClient(decoder=_FakeDecoder())
assert client is not None
client = Client(decoder=fake)
assert client._decoder is fake # noqa: SLF001 — wired the explicit decoder, not a default
10 changes: 7 additions & 3 deletions tests/test_threading_with_shared_budget.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ def test_shared_budget_across_sync_threads_and_async_loop() -> None:
t.join()

# The lock kept the budget's internal deques consistent — no IndexError, no corruption.
# No specific count assertion: the test passes if it completes without an exception
# from the budget itself. Add a smoke check that the budget recorded SOME activity:
assert len(budget._deposits) > 0 # noqa: SLF001
# 0.8.3 deposit-hoist: deposits count requests, not attempts (one per __call__,
# regardless of max_attempts). Budget TTL is 60.0 so no purge fires during the
# sub-second runtime; the count is exact.
expected_deposits = (_N_SYNC_THREADS * _N_OPS_PER_THREAD) + _N_ASYNC_TASKS
assert len(budget._deposits) == expected_deposits, ( # noqa: SLF001
f"expected {expected_deposits} deposits, got {len(budget._deposits)}" # noqa: SLF001
)

sync_client.close()
Loading