From 0fc368f27156c2805147163d66332f4932256238 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 18:56:07 +0300 Subject: [PATCH 1/9] docs(planning): spec AsyncClient.stream() context manager (0.5.0, Epic 4 story 4-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-method addition to AsyncClient. Bypasses the middleware chain (documented rationale: streams don't compose with Retry/Bulkhead/decoder). Maps httpx2 exceptions raised during request + body consumption to httpware exceptions, mirroring _terminal's dispatch ordering. Closes the deferred-work item about httpx2.StreamError-family escape. The Retry-refuses-streamed-body item stays open as a follow-up PR. No auto-raise on 4xx/5xx (matches httpx convention; deliberate divergence from client.get/post/etc. which DO auto-raise — rationale in spec). No StreamResponse wrapper type. No response_model= parameter. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/specs/2026-06-05-streaming-design.md | 204 ++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 planning/specs/2026-06-05-streaming-design.md diff --git a/planning/specs/2026-06-05-streaming-design.md b/planning/specs/2026-06-05-streaming-design.md new file mode 100644 index 0000000..63cf88f --- /dev/null +++ b/planning/specs/2026-06-05-streaming-design.md @@ -0,0 +1,204 @@ +# Spec: AsyncClient.stream context manager (0.5.0, Epic 4 story 4-3) + +**Date:** 2026-06-05 +**Topic slug:** `streaming` +**Status:** drafted, awaiting user review +**Target release:** 0.5.0 +**Epic 4 stories rolled in:** 4-3 (the only surviving Epic 4 story post-v0.2 pivot). + +## Purpose + +Add `AsyncClient.stream(method, url, **kwargs)` — an async context manager that streams the response body. Mirrors `httpx2.AsyncClient.stream()` directly; bypasses the middleware chain; wraps the request and body-consumption phases in the same exception-mapping that `AsyncClient._terminal` uses, so users see consistent `httpware` exception types regardless of whether they call `client.get()` or `client.stream()`. + +This is the only Epic 4 work; after it ships, Epic 4 is closed. The Retry-refuses-streamed-body deferred-work item (`planning/deferred-work.md` §"Retry + streaming bodies") stays open as a separate small follow-up PR. + +## Non-goals + +Items deliberately deferred so this slice ships clean: + +- **No middleware-chain composition.** `stream()` bypasses Retry, Bulkhead, and any user-installed middleware. Documented in the `stream()` docstring. Rationale: the middleware chain operates on `(request, response)` pairs where the response is a fully-buffered `httpx2.Response`. Streaming responses fundamentally break that model (reading `.content` consumes the stream, defeating the purpose). Retry can't replay consumed streams. Bulkhead's "hold slot for one request" semantics get pathological when a stream stays open for minutes. Bypass is the only sensible v1 default. +- **No `StreamResponse` wrapper type.** Returns `httpx2.Response` directly. Pre-v0.2 there was a `StreamResponse` class; the v0.2 thin-wrapper pivot deleted it (engineering.md §8 historical: *"`4-1` `StreamResponse` type, `4-2` transport stream implementation"* explicitly deleted). This slice does not bring it back. +- **No auto-raise on 4xx/5xx.** `stream()` does NOT raise `StatusError` subclasses on bad status codes — deliberate divergence from `client.get()`/`client.post()`/etc. Users call `response.raise_for_status()` if they want that behavior. Rationale: streams often have meaningful bodies even at 4xx (partial-success responses, structured error bodies); auto-raising would force users to lose access to the response object before they could read it. Matches httpx convention. +- **No `response_model=` decoding parameter.** Doesn't apply to streams (the body isn't a single bytes blob to decode upfront). Not exposed in the `stream()` signature. +- **No Retry-refuses-streamed-body fix.** Separate follow-up PR. The `deferred-work.md` entry stays open. Adding `stream()` makes streaming-body usage more visible, so the latent retry-replay footgun becomes more likely to be hit — call this out in the follow-up. +- **No Bulkhead-during-stream integration.** Bulkhead doesn't see `stream()` calls (bypass). If users want stream concurrency limits, they manage at the call site (their own semaphore, etc.). +- **No `_map_httpx2_exceptions` shared helper.** Intentional duplication of the except chain between `_terminal` and `stream()` — only two call sites, and the duplication keeps each path self-contained and readable. Extract if a third call site emerges. + +## Architecture + +`AsyncClient.stream()` is a method on the existing `AsyncClient` class in `src/httpware/client.py`. No new files. No new module. The implementation is one method (~30 lines) decorated with `@contextlib.asynccontextmanager`. + +```text +src/httpware/ +└── client.py # AsyncClient — add stream() method +``` + +## Public API + +```python +import contextlib + +@contextlib.asynccontextmanager +async def stream( + self, + method: str, + url: str, + *, + params: typing.Any | None = None, + headers: typing.Any | None = None, + cookies: typing.Any | None = None, + timeout: typing.Any = httpx2.USE_CLIENT_DEFAULT, + extensions: typing.Any | None = None, + json: typing.Any | None = None, + content: typing.Any | None = None, + data: typing.Any | None = None, + files: typing.Any | None = None, +) -> AsyncIterator[httpx2.Response]: + """Stream an HTTP response. Bypasses the middleware chain. + + Yields an httpx2.Response; consume the body via response.aiter_bytes(), + response.aiter_text(), response.aiter_lines(), or response.aiter_raw(). + The body is NOT pre-read; the response is closed when the context exits. + + Bypasses the middleware chain (no Retry, no Bulkhead, no user-installed + middleware) — see the design spec for rationale. Maps httpx2 exceptions + raised during the request OR body consumption to httpware exceptions + consistently with the rest of AsyncClient. + + Does NOT auto-raise on 4xx/5xx — call response.raise_for_status() if + you want StatusError-style behavior. + """ +``` + +Usage: + +```python +async with client.stream("GET", "/big-file") as response: + if response.status_code != 200: + ... + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +Sharing a streaming body (e.g., for upload): + +```python +async def body() -> AsyncIterator[bytes]: + async for chunk in some_source(): + yield chunk + +async with client.stream("POST", "/upload", content=body()) as response: + ... +``` + +(Note: with streamed REQUEST bodies, do not put `Retry` in the middleware chain for non-stream calls until the Retry-streamed-body refusal PR lands. The stream() method itself bypasses Retry, so it's safe.) + +## Implementation algorithm + +Signature is the same as the "Public API" block above (all kwargs named explicitly; same shape as `_request_with_body`). Body: + +```python +# Inside the method body, after the signature shown in Public API above: +kwargs: dict[str, typing.Any] = {} +if params is not None: + kwargs["params"] = params +if headers is not None: + kwargs["headers"] = headers +if cookies is not None: + kwargs["cookies"] = cookies +if timeout is not httpx2.USE_CLIENT_DEFAULT: + kwargs["timeout"] = timeout +if extensions is not None: + kwargs["extensions"] = extensions +if json is not None: + kwargs["json"] = json +if content is not None: + kwargs["content"] = content +if data is not None: + kwargs["data"] = data +if files is not None: + kwargs["files"] = files + +try: + async with self._httpx2_client.stream(method, url, **kwargs) as response: + yield response +except httpx2.TimeoutException as exc: + raise TimeoutError(str(exc)) from exc +except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: + raise TransportError(str(exc)) from exc +except httpx2.NetworkError as exc: + raise NetworkError(str(exc)) from exc +except httpx2.HTTPError as exc: + raise TransportError(str(exc)) from exc +``` + +The kwarg-passing block mirrors `_request_with_body` (`client.py:166-185`) verbatim except for absent params (`response_model` doesn't apply to streams). + +**Clause ordering** matches `_terminal` (`client.py:107-118`) so the same dispatch invariants hold: `httpx2.NetworkError` precedes `httpx2.HTTPError` (subclass before parent), `(InvalidURL, CookieConflict)` catches BEFORE `NetworkError` so they stay non-`NetworkError` (and are therefore non-retryable when Retry's streamed-body fix lands). + +Exceptions can arise in three places, all caught by the same try/except: + +1. **Request phase** (httpx2 sends the request, gets headers): `__aenter__` of `httpx2.AsyncClient.stream()`. Exceptions propagate to our try/except directly. +2. **Body-consumption phase** (user code inside `async with client.stream(...) as response:` does `async for chunk in response.aiter_bytes()`): Exceptions propagate UP through the `yield response`, the inner `httpx2.stream()` context manager's `__aexit__` runs cleanup, then they hit our outer try/except. +3. **Cleanup phase** (response close at context exit): typically silent; any exception during close is wrapped the same way. + +Non-`httpx2` exceptions (user code raises `ValueError` during chunk processing, etc.) are NOT caught by our handlers — they propagate to the user unchanged. `asyncio.CancelledError` (`BaseException` subclass) is never caught. + +## Behavior reference + +| Situation | Behavior | +|-----------|----------| +| 2xx response | Yields `httpx2.Response`; user consumes body via aiter methods | +| 4xx/5xx response | Yields `httpx2.Response` normally — NO auto-raise. User calls `response.raise_for_status()` for raise-on-error | +| Network error during initial request | Raises `httpware.NetworkError` from `__aenter__` | +| Network error mid-stream | Raises `httpware.NetworkError` from the `async for` line in user code | +| Timeout during request or body consumption | Raises `httpware.TimeoutError` | +| `InvalidURL` / `CookieConflict` | Raises bare `httpware.TransportError` (NOT `NetworkError`) — non-retryable family | +| User code inside `async with` raises | Propagates unchanged; httpx2's context manager cleans up the response | +| `asyncio.CancelledError` during stream | Propagates unchanged; httpx2's context manager cleans up the response; no slot or resource leak | + +## Testing + +Per `planning/engineering.md §6`. New file `tests/test_client_stream.py`: + +- `test_streams_response_body_successfully` — handler returns chunks; assert `async for chunk in response.aiter_bytes()` yields them in order +- `test_does_not_auto_raise_on_4xx` — handler returns 404; assert the context manager yields a Response with `status_code == 404` (no `NotFoundError` raised) +- `test_does_not_auto_raise_on_5xx` — same with 503 +- `test_explicit_raise_for_status_works` — user calls `response.raise_for_status()` inside the block; expect `httpx2.HTTPStatusError` +- `test_network_error_during_request_maps_to_network_error` — handler raises `httpx2.ConnectError`; expect `httpware.NetworkError` +- `test_network_error_during_body_consumption_maps_to_network_error` — handler raises `httpx2.ReadError` during the streaming response; expect `httpware.NetworkError` when user iterates body +- `test_timeout_during_stream_maps_to_httpware_timeout` — handler raises `httpx2.ReadTimeout`; expect `httpware.TimeoutError` +- `test_invalid_url_maps_to_bare_transport_error` — handler raises `httpx2.InvalidURL`; expect `httpware.TransportError` and NOT `httpware.NetworkError` +- `test_cancellation_propagates_cleanly` — outer task cancels while body iteration is in progress; expect `asyncio.CancelledError` propagates and stream is closed +- `test_user_exception_in_block_propagates_unchanged` — user raises `ValueError` during chunk processing; expect `ValueError` propagates with no httpware wrapping +- `test_bypasses_middleware_chain` — install a recording middleware that increments a counter on `__call__`; do a `stream()` call; assert counter == 0 +- `test_forwards_kwargs_to_httpx2` — pass `params`, `headers`, `cookies`, `extensions`; assert they reach the mock transport's recorded request +- `test_stream_with_content_kwarg` — `client.stream("POST", url, content=b"bytes")` works; mock transport sees the bytes + +Coverage target: **100% line coverage** (project standard). + +## Public API exports + +`stream` is a method on `AsyncClient`, which is already exported from `httpware/__init__.py`. No new top-level exports needed. + +## Documentation updates + +This PR touches the user-facing docs since we just shipped a docs-sync pass: + +- **README.md**: add a brief paragraph (or extend the Quickstart) showing `client.stream()`. Note the no-auto-raise divergence. +- **docs/index.md**: mirror the README addition. +- **planning/engineering.md §1**: append a sentence mentioning the streaming surface. +- **planning/engineering.md §8** roadmap: mark `4-3` shipped; note Epic 4 closes (with the Retry-streamed-body follow-up still open as deferred work). +- **planning/releases/0.5.0.md**: new release notes file. + +## Open questions deferred to implementation + +- **`httpx2.NetworkError` symbol existence**: confirmed available (used in slice 1 / NetworkError refinement work). No fallback needed. +- **Whether `httpx2.ReadError` raised mid-stream actually maps to `httpx2.NetworkError`**: the implementer should verify `isinstance(httpx2.ReadError(...), httpx2.NetworkError)` is True (it should be per httpx convention). If it isn't, fall back to enumerating: `except (httpx2.ConnectError, httpx2.ReadError, httpx2.WriteError, httpx2.CloseError) as exc`. + +## References + +- `planning/engineering.md` §1 (project intent), §3 (protocol seams), §5 (module layout), §8 (roadmap) +- `planning/deferred-work.md` §"Retry + streaming bodies" (the open Epic-4-interaction item — stays open after this PR) +- `planning/deferred-work.md` §"Closed by the v0.2 thin-wrapper pivot" → "`httpx2.StreamError` family escape from the transport's `except httpx2.HTTPError`" — closed by this slice's exception mapping +- httpx streaming docs (convention reference): https://www.python-httpx.org/async/#streaming-responses From 5d8ea32ceb134173e10191840612510732af05e4 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:12:50 +0300 Subject: [PATCH 2/9] docs(planning): spec revisions per scope expansion Per user pushback in brainstorming: - Auto-raise on 4xx/5xx for streams (with body pre-read so exc.response.content works). - Extract shared _httpx2_exception_mapper helper used by _terminal + stream(). - Retry refuses streamed-body requests (closes the deferred-work item). - Keep middleware bypass for v1 (YAGNI; revisit later). Detection mechanism for streamed-body refusal: request.extensions "httpware.streaming_body" marker, set by _request_with_body when content/data/files is an async-iterable. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/specs/2026-06-05-streaming-design.md | 199 ++++++++++++++---- 1 file changed, 160 insertions(+), 39 deletions(-) diff --git a/planning/specs/2026-06-05-streaming-design.md b/planning/specs/2026-06-05-streaming-design.md index 63cf88f..2325499 100644 --- a/planning/specs/2026-06-05-streaming-design.md +++ b/planning/specs/2026-06-05-streaming-design.md @@ -8,29 +8,36 @@ ## Purpose -Add `AsyncClient.stream(method, url, **kwargs)` — an async context manager that streams the response body. Mirrors `httpx2.AsyncClient.stream()` directly; bypasses the middleware chain; wraps the request and body-consumption phases in the same exception-mapping that `AsyncClient._terminal` uses, so users see consistent `httpware` exception types regardless of whether they call `client.get()` or `client.stream()`. +Add `AsyncClient.stream(method, url, **kwargs)` — an async context manager that streams the response body. Mirrors `httpx2.AsyncClient.stream()` directly; **bypasses the middleware chain for v1** (revisit later if user feedback warrants); **auto-raises `StatusError` subclasses on 4xx/5xx** (consistent with `client.get()`/`client.post()`/etc., body pre-read before raising so `exc.response.content` works). Wraps the request and body-consumption phases in a new `_httpx2_exception_mapper` helper shared with `_terminal`, so users see consistent `httpware` exception types regardless of which client method they call. -This is the only Epic 4 work; after it ships, Epic 4 is closed. The Retry-refuses-streamed-body deferred-work item (`planning/deferred-work.md` §"Retry + streaming bodies") stays open as a separate small follow-up PR. +Also in scope: **`Retry` refuses to retry requests with streamed bodies.** Async-iterable bodies can't replay across retry attempts. Detection via an `httpware.streaming_body` marker added to `request.extensions` by `_request_with_body` when `content=`/`data=`/`files=` is async-iterable; Retry reads the marker and refuses the retry path with a clear PEP-678 note. Closes the open deferred-work item. + +After this PR ships, Epic 4 is closed and the Retry-streamed-body deferred-work item is closed. ## Non-goals Items deliberately deferred so this slice ships clean: -- **No middleware-chain composition.** `stream()` bypasses Retry, Bulkhead, and any user-installed middleware. Documented in the `stream()` docstring. Rationale: the middleware chain operates on `(request, response)` pairs where the response is a fully-buffered `httpx2.Response`. Streaming responses fundamentally break that model (reading `.content` consumes the stream, defeating the purpose). Retry can't replay consumed streams. Bulkhead's "hold slot for one request" semantics get pathological when a stream stays open for minutes. Bypass is the only sensible v1 default. +- **No middleware-chain composition for `stream()`.** `stream()` bypasses Retry, Bulkhead, and any user-installed middleware **for v1**. Documented in the `stream()` docstring. Rationale: the middleware protocol operates on `(request, response)` pairs where the response is a fully-buffered `httpx2.Response`. Streaming responses fundamentally break that model (reading `.content` consumes the stream). Composing streams with middleware requires a per-middleware stream-aware policy (e.g., a `request.extensions["httpware.stream"]` marker and updates to every middleware) — meaningful additional design + code. Defer until real-user feedback shows the need. Revisiting is purely additive (add an `apply_middleware: bool = False` parameter and the marker mechanism in a follow-up). - **No `StreamResponse` wrapper type.** Returns `httpx2.Response` directly. Pre-v0.2 there was a `StreamResponse` class; the v0.2 thin-wrapper pivot deleted it (engineering.md §8 historical: *"`4-1` `StreamResponse` type, `4-2` transport stream implementation"* explicitly deleted). This slice does not bring it back. -- **No auto-raise on 4xx/5xx.** `stream()` does NOT raise `StatusError` subclasses on bad status codes — deliberate divergence from `client.get()`/`client.post()`/etc. Users call `response.raise_for_status()` if they want that behavior. Rationale: streams often have meaningful bodies even at 4xx (partial-success responses, structured error bodies); auto-raising would force users to lose access to the response object before they could read it. Matches httpx convention. - **No `response_model=` decoding parameter.** Doesn't apply to streams (the body isn't a single bytes blob to decode upfront). Not exposed in the `stream()` signature. -- **No Retry-refuses-streamed-body fix.** Separate follow-up PR. The `deferred-work.md` entry stays open. Adding `stream()` makes streaming-body usage more visible, so the latent retry-replay footgun becomes more likely to be hit — call this out in the follow-up. - **No Bulkhead-during-stream integration.** Bulkhead doesn't see `stream()` calls (bypass). If users want stream concurrency limits, they manage at the call site (their own semaphore, etc.). -- **No `_map_httpx2_exceptions` shared helper.** Intentional duplication of the except chain between `_terminal` and `stream()` — only two call sites, and the duplication keeps each path self-contained and readable. Extract if a third call site emerges. +- **No marker-based auto-detection beyond `content`/`data`/`files`.** Retry's streamed-body refusal triggers only when a streaming body was passed through `_request_with_body`'s kwargs. Manually-constructed `httpx2.Request` objects with hand-set streaming bodies are NOT detected. Users constructing requests manually accept the responsibility. ## Architecture -`AsyncClient.stream()` is a method on the existing `AsyncClient` class in `src/httpware/client.py`. No new files. No new module. The implementation is one method (~30 lines) decorated with `@contextlib.asynccontextmanager`. +Three coordinated changes: + +1. **`src/httpware/client.py`** — add `AsyncClient.stream()` (an `@contextlib.asynccontextmanager` method, ~40 lines). Extract a new module-level `_httpx2_exception_mapper` `@asynccontextmanager` helper (~12 lines) used by both `_terminal` and `stream()` — one source of truth for the httpx2→httpware exception dispatch. Refactor `_terminal` to use the helper. In `_request_with_body`, detect async-iterable `content`/`data`/`files` and mark `request.extensions["httpware.streaming_body"] = True`. + +2. **`src/httpware/middleware/resilience/retry.py`** — in `Retry.__call__`, before each retry attempt, check `request.extensions.get("httpware.streaming_body")`. If True and a retry would otherwise happen, refuse: raise the original error with a PEP-678 note (`"httpware: not retrying — request body is a stream that cannot replay"`). + +No new files. No new module. ```text src/httpware/ -└── client.py # AsyncClient — add stream() method +├── client.py # AsyncClient — add stream() + _httpx2_exception_mapper + streaming-body marker +└── middleware/resilience/retry.py # Retry — refuse retry when streaming-body marker is set ``` ## Public API @@ -58,15 +65,20 @@ async def stream( Yields an httpx2.Response; consume the body via response.aiter_bytes(), response.aiter_text(), response.aiter_lines(), or response.aiter_raw(). - The body is NOT pre-read; the response is closed when the context exits. + The body is NOT pre-read for 2xx responses (the streaming property is + preserved); the response is closed when the context exits. Bypasses the middleware chain (no Retry, no Bulkhead, no user-installed - middleware) — see the design spec for rationale. Maps httpx2 exceptions - raised during the request OR body consumption to httpware exceptions - consistently with the rest of AsyncClient. + middleware) for v1 — see the design spec for rationale. + + Auto-raises StatusError subclasses on 4xx/5xx (NotFoundError, + ServiceUnavailableError, etc.) — consistent with client.get()/post()/etc. + On error, the response body is pre-read so exc.response.content is + accessible. You lose the streaming property on errors; rare in practice + since 4xx/5xx bodies are typically small. - Does NOT auto-raise on 4xx/5xx — call response.raise_for_status() if - you want StatusError-style behavior. + Maps httpx2 exceptions raised during the request OR body consumption to + httpware exceptions consistently with the rest of AsyncClient. """ ``` @@ -74,10 +86,16 @@ Usage: ```python async with client.stream("GET", "/big-file") as response: - if response.status_code != 200: - ... + # response.status_code is guaranteed 2xx or 3xx here — 4xx/5xx auto-raise async for chunk in response.aiter_bytes(): process(chunk) + +# Catch like any other status error: +try: + async with client.stream("GET", "/maybe-missing") as response: + ... +except NotFoundError as exc: + body_text = exc.response.text # pre-read on error; available here ``` Sharing a streaming body (e.g., for upload): @@ -95,10 +113,71 @@ async with client.stream("POST", "/upload", content=body()) as response: ## Implementation algorithm -Signature is the same as the "Public API" block above (all kwargs named explicitly; same shape as `_request_with_body`). Body: +### `_httpx2_exception_mapper` (shared helper) + +New module-level helper at the top of `client.py`: + +```python +@contextlib.asynccontextmanager +async def _httpx2_exception_mapper() -> AsyncIterator[None]: + """Map httpx2 exceptions to httpware exceptions. Shared by _terminal and stream().""" + try: + yield + except httpx2.TimeoutException as exc: + raise TimeoutError(str(exc)) from exc + except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: + raise TransportError(str(exc)) from exc + except httpx2.NetworkError as exc: + raise NetworkError(str(exc)) from exc + except httpx2.HTTPError as exc: + raise TransportError(str(exc)) from exc +``` + +Clause ordering must match the current `_terminal` (TimeoutException → InvalidURL/CookieConflict → NetworkError → HTTPError). `RuntimeError` "closed" check stays in `_terminal` (it's not an httpx2 error and only applies to the non-stream path). + +### `_terminal` refactor + +The current except chain in `_terminal` (`client.py:107-118`) becomes: + +```python +async def _terminal(self, request: httpx2.Request) -> httpx2.Response: + try: + async with _httpx2_exception_mapper(): + response = await self._httpx2_client.send(request) + except RuntimeError as exc: + if "closed" in str(exc): + raise TransportError(str(exc)) from exc + raise + # ... status-code dispatch unchanged +``` + +### `_request_with_body` streaming-body detection + +Add a helper and a marker step. Detection function: ```python -# Inside the method body, after the signature shown in Public API above: +def _is_streaming_body(value: typing.Any) -> bool: + """True if value is an async-iterable that can't be safely replayed for retry.""" + if value is None: + return False + if isinstance(value, (bytes, bytearray, memoryview, str, dict)): + return False + return hasattr(value, "__aiter__") +``` + +In `_request_with_body`, after `request = self._httpx2_client.build_request(...)` and before `await self.send(request, ...)`: + +```python +if _is_streaming_body(content) or _is_streaming_body(data) or _is_streaming_body(files): + request.extensions["httpware.streaming_body"] = True +``` + +### `stream()` method + +Signature per "Public API" above. Body: + +```python +# Build kwargs (same pattern as _request_with_body but without response_model): kwargs: dict[str, typing.Any] = {} if params is not None: kwargs["params"] = params @@ -119,20 +198,37 @@ if data is not None: if files is not None: kwargs["files"] = files -try: +async with _httpx2_exception_mapper(): async with self._httpx2_client.stream(method, url, **kwargs) as response: + status = response.status_code + if HTTPStatus.BAD_REQUEST <= status < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx + await response.aread() # pre-read body so exc.response.content is accessible + exc_class = STATUS_TO_EXCEPTION.get( + status, + ClientStatusError if status < HTTPStatus.INTERNAL_SERVER_ERROR else ServerStatusError, + ) + raise exc_class(response) yield response -except httpx2.TimeoutException as exc: - raise TimeoutError(str(exc)) from exc -except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: - raise TransportError(str(exc)) from exc -except httpx2.NetworkError as exc: - raise NetworkError(str(exc)) from exc -except httpx2.HTTPError as exc: - raise TransportError(str(exc)) from exc ``` -The kwarg-passing block mirrors `_request_with_body` (`client.py:166-185`) verbatim except for absent params (`response_model` doesn't apply to streams). +Status-code dispatch is byte-for-byte identical to `_terminal`'s (the table lookup, the same fallback, the same `noqa: PLR2004` comment). Both call sites stay in sync — if a future change refines status-error dispatch (e.g., adds new subclasses), both update together. Worth extracting into a helper? Probably yes once both call sites exist; defer to implementation review. + +### `Retry.__call__` streaming-body refusal + +In `Retry.__call__`, after the per-attempt failure is identified and BEFORE the retry/sleep block (i.e., right before `self.budget.try_withdraw()`): + +```python +if request.extensions.get("httpware.streaming_body"): + if last_exc is None: # pragma: no cover — invariant + msg = "Retry: streaming-body refusal reached with no last_exc" + raise AssertionError(msg) + last_exc.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) + raise last_exc +``` + +The note is added with PEP 678 `add_note` — same pattern as the max-attempts exhaustion path. Order: streaming-body check comes BEFORE the budget gate so we don't withdraw a budget token for a request we won't retry. **Clause ordering** matches `_terminal` (`client.py:107-118`) so the same dispatch invariants hold: `httpx2.NetworkError` precedes `httpx2.HTTPError` (subclass before parent), `(InvalidURL, CookieConflict)` catches BEFORE `NetworkError` so they stay non-`NetworkError` (and are therefore non-retryable when Retry's streamed-body fix lands). @@ -148,25 +244,30 @@ Non-`httpx2` exceptions (user code raises `ValueError` during chunk processing, | Situation | Behavior | |-----------|----------| -| 2xx response | Yields `httpx2.Response`; user consumes body via aiter methods | -| 4xx/5xx response | Yields `httpx2.Response` normally — NO auto-raise. User calls `response.raise_for_status()` for raise-on-error | +| 2xx/3xx response | Yields `httpx2.Response`; user consumes body via aiter methods (streaming preserved) | +| 4xx/5xx response | Pre-reads body, raises `StatusError` subclass (`NotFoundError`, `ServiceUnavailableError`, etc.). Caller's `exc.response.content` / `exc.response.text` are accessible | | Network error during initial request | Raises `httpware.NetworkError` from `__aenter__` | | Network error mid-stream | Raises `httpware.NetworkError` from the `async for` line in user code | | Timeout during request or body consumption | Raises `httpware.TimeoutError` | | `InvalidURL` / `CookieConflict` | Raises bare `httpware.TransportError` (NOT `NetworkError`) — non-retryable family | | User code inside `async with` raises | Propagates unchanged; httpx2's context manager cleans up the response | | `asyncio.CancelledError` during stream | Propagates unchanged; httpx2's context manager cleans up the response; no slot or resource leak | +| `Retry` middleware sees a streaming-body request and a retryable error | Re-raises the original error with PEP-678 note: `"httpware: not retrying — request body is a stream that cannot replay across attempts"` | ## Testing -Per `planning/engineering.md §6`. New file `tests/test_client_stream.py`: +Per `planning/engineering.md §6`. Three test files touched/created. + +### New file `tests/test_client_stream.py` - `test_streams_response_body_successfully` — handler returns chunks; assert `async for chunk in response.aiter_bytes()` yields them in order -- `test_does_not_auto_raise_on_4xx` — handler returns 404; assert the context manager yields a Response with `status_code == 404` (no `NotFoundError` raised) -- `test_does_not_auto_raise_on_5xx` — same with 503 -- `test_explicit_raise_for_status_works` — user calls `response.raise_for_status()` inside the block; expect `httpx2.HTTPStatusError` +- `test_auto_raises_on_4xx_with_body_preread` — handler returns 404 with a JSON body; expect `NotFoundError`; assert `exc.response.content` is the JSON body (proves pre-read worked) +- `test_auto_raises_on_5xx_with_body_preread` — same with 503 → `ServiceUnavailableError` +- `test_auto_raises_unknown_4xx_falls_back_to_client_status_error` — 418 → `ClientStatusError` +- `test_auto_raises_unknown_5xx_falls_back_to_server_status_error` — 599 → `ServerStatusError` +- `test_3xx_does_not_raise` — 301 redirect response yielded normally - `test_network_error_during_request_maps_to_network_error` — handler raises `httpx2.ConnectError`; expect `httpware.NetworkError` -- `test_network_error_during_body_consumption_maps_to_network_error` — handler raises `httpx2.ReadError` during the streaming response; expect `httpware.NetworkError` when user iterates body +- `test_network_error_during_body_consumption_maps_to_network_error` — handler streams partial bytes then raises `httpx2.ReadError`; expect `httpware.NetworkError` when user iterates body - `test_timeout_during_stream_maps_to_httpware_timeout` — handler raises `httpx2.ReadTimeout`; expect `httpware.TimeoutError` - `test_invalid_url_maps_to_bare_transport_error` — handler raises `httpx2.InvalidURL`; expect `httpware.TransportError` and NOT `httpware.NetworkError` - `test_cancellation_propagates_cleanly` — outer task cancels while body iteration is in progress; expect `asyncio.CancelledError` propagates and stream is closed @@ -174,6 +275,23 @@ Per `planning/engineering.md §6`. New file `tests/test_client_stream.py`: - `test_bypasses_middleware_chain` — install a recording middleware that increments a counter on `__call__`; do a `stream()` call; assert counter == 0 - `test_forwards_kwargs_to_httpx2` — pass `params`, `headers`, `cookies`, `extensions`; assert they reach the mock transport's recorded request - `test_stream_with_content_kwarg` — `client.stream("POST", url, content=b"bytes")` works; mock transport sees the bytes +- `test_stream_with_async_iterable_content` — `client.stream("POST", url, content=async_gen())` works (streaming request body — confirms the bypass path doesn't accidentally choke) + +### Modified file `tests/test_error_mapping_terminal.py` + +No new tests required; existing tests must still pass after the `_terminal` refactor to use `_httpx2_exception_mapper`. Dispatch behavior is byte-for-byte identical. + +### Modified file `tests/test_retry.py` + +Streaming-body refusal tests: + +- `test_retry_refuses_streamed_body_request` — build a request, set `request.extensions["httpware.streaming_body"] = True`, configure Retry, force a retryable failure; expect the original exception to propagate with the PEP-678 note about not retrying. Assert `_sleep` was NOT called (no retry attempt; no backoff). +- `test_retry_refuses_streamed_body_does_not_consume_budget` — same setup with an explicit `RetryBudget`; after the call, assert no budget token was withdrawn (the streaming-body refusal happens BEFORE `budget.try_withdraw()`). +- `test_client_post_with_async_iterable_content_marks_extensions` — call `client.post(url, content=async_gen())` with a recording mock transport; assert the request reached the transport with `request.extensions["httpware.streaming_body"] is True`. +- `test_client_post_with_bytes_content_does_not_mark_extensions` — `client.post(url, content=b"hi")`; assert the marker is NOT present. +- `test_client_post_with_dict_data_does_not_mark_extensions` — `client.post(url, data={"k": "v"})`; assert the marker is NOT present. +- `test_client_post_with_async_iterable_data_marks_extensions` — `client.post(url, data=async_gen())`; assert marker present. +- `test_client_post_with_async_iterable_files_marks_extensions` — `client.post(url, files=async_gen())`; assert marker present. Coverage target: **100% line coverage** (project standard). @@ -185,20 +303,23 @@ Coverage target: **100% line coverage** (project standard). This PR touches the user-facing docs since we just shipped a docs-sync pass: -- **README.md**: add a brief paragraph (or extend the Quickstart) showing `client.stream()`. Note the no-auto-raise divergence. +- **README.md**: add a brief paragraph (or extend the Quickstart) showing `client.stream()`. Note that Retry refuses streamed-body requests. - **docs/index.md**: mirror the README addition. - **planning/engineering.md §1**: append a sentence mentioning the streaming surface. -- **planning/engineering.md §8** roadmap: mark `4-3` shipped; note Epic 4 closes (with the Retry-streamed-body follow-up still open as deferred work). +- **planning/engineering.md §8** roadmap: mark `4-3` shipped; Epic 4 closes. +- **planning/deferred-work.md**: close the "Retry + streaming bodies (Epic 4 interaction)" item (move from Open to a new "Closed by 0.5.0 streaming" section). - **planning/releases/0.5.0.md**: new release notes file. ## Open questions deferred to implementation - **`httpx2.NetworkError` symbol existence**: confirmed available (used in slice 1 / NetworkError refinement work). No fallback needed. - **Whether `httpx2.ReadError` raised mid-stream actually maps to `httpx2.NetworkError`**: the implementer should verify `isinstance(httpx2.ReadError(...), httpx2.NetworkError)` is True (it should be per httpx convention). If it isn't, fall back to enumerating: `except (httpx2.ConnectError, httpx2.ReadError, httpx2.WriteError, httpx2.CloseError) as exc`. +- **`httpx2.Request.extensions` mutability**: should be a `dict[str, Any]` per httpx convention. Implementer must verify `request.extensions["httpware.streaming_body"] = True` works (does not raise on a freshly-built request). If `request.extensions` could be `None` or read-only, swap to `request.extensions = {**(request.extensions or {}), "httpware.streaming_body": True}`. +- **Whether status-code dispatch should be extracted from `_terminal` to a shared helper**: with both `_terminal` and `stream()` doing the same `HTTPStatus.BAD_REQUEST <= status < 600` check + `STATUS_TO_EXCEPTION.get(...)` fallback, extracting may be worth it. Decide during implementation; if extracted, both paths use the helper. ## References - `planning/engineering.md` §1 (project intent), §3 (protocol seams), §5 (module layout), §8 (roadmap) -- `planning/deferred-work.md` §"Retry + streaming bodies" (the open Epic-4-interaction item — stays open after this PR) -- `planning/deferred-work.md` §"Closed by the v0.2 thin-wrapper pivot" → "`httpx2.StreamError` family escape from the transport's `except httpx2.HTTPError`" — closed by this slice's exception mapping +- `planning/deferred-work.md` §"Retry + streaming bodies" — **closes** with this PR (Retry's streaming-body refusal lands here) +- `planning/deferred-work.md` §"Closed by the v0.2 thin-wrapper pivot" → "`httpx2.StreamError` family escape from the transport's `except httpx2.HTTPError`" — also closed by this slice's exception mapping - httpx streaming docs (convention reference): https://www.python-httpx.org/async/#streaming-responses From 42fcc5d23d1e5022848d2c1fad580b7a159d19dd Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:19:48 +0300 Subject: [PATCH 3/9] docs(planning): implementation plan for AsyncClient.stream() + Retry-refuses-streamed-body (0.5.0) 6 TDD tasks on feat/v0.5-streaming. Task 1 is a pure refactor extracting _httpx2_exception_mapper + _raise_on_status_error helpers so _terminal and the new stream() share dispatch logic. Tasks 2-3 wire the streaming-body marker (in _request_with_body) + Retry's refusal-with- PEP-678-note. Task 4 adds AsyncClient.stream() with auto-raise + body pre-read on 4xx/5xx. Task 5 syncs README/docs/index.md/engineering.md/ deferred-work.md + drafts 0.5.0 release notes. Task 6 verifies + pushes. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/plans/2026-06-05-streaming-plan.md | 1089 +++++++++++++++++++ 1 file changed, 1089 insertions(+) create mode 100644 planning/plans/2026-06-05-streaming-plan.md diff --git a/planning/plans/2026-06-05-streaming-plan.md b/planning/plans/2026-06-05-streaming-plan.md new file mode 100644 index 0000000..eb055b6 --- /dev/null +++ b/planning/plans/2026-06-05-streaming-plan.md @@ -0,0 +1,1089 @@ +# AsyncClient.stream + Retry-refuses-streamed-body (0.5.0, Epic 4 story 4-3) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship `AsyncClient.stream()` (async context-manager yielding `httpx2.Response`, with 4xx/5xx auto-raise + body pre-read, exception mapping via a new shared helper). Close two deferred-work items by also adding `Retry`'s refusal of streamed-body requests (detected via a `request.extensions["httpware.streaming_body"]` marker set in `_request_with_body` when `content`/`data`/`files` is async-iterable). + +**Architecture:** Two coordinated changes to `client.py` (helper extraction + `stream()` method + streaming-body marker) and one to `retry.py` (refusal). The `_httpx2_exception_mapper` helper and a `_raise_on_status_error` helper are extracted up front so `_terminal` and `stream()` share dispatch logic — no duplication. `stream()` bypasses the middleware chain (v1 decision); auto-raise on 4xx/5xx with body pre-read keeps consistency with `client.get()`/etc. + +**Tech Stack:** Python 3.11+ (`contextlib.asynccontextmanager`, `asyncio`), `httpx2`, `pytest` / `pytest-asyncio` (auto mode), `uv`, `just`, `ruff`, `ty`. + +**Target branch:** `feat/v0.5-streaming`. Create from `main` before Task 1: `git checkout main && git pull && git checkout -b feat/v0.5-streaming`. + +**Source spec:** [`planning/specs/2026-06-05-streaming-design.md`](../specs/2026-06-05-streaming-design.md). Read it before starting — the *why* for each decision lives there. + +--- + +## File structure + +**Modified files:** +- `src/httpware/client.py` — extract `_httpx2_exception_mapper` + `_raise_on_status_error` helpers; refactor `_terminal` to use both; add `_is_streaming_body` helper; add streaming-body marker step in `_request_with_body`; add `stream()` method +- `src/httpware/middleware/resilience/retry.py` — refuse retry when `request.extensions["httpware.streaming_body"]` is True +- `tests/test_retry.py` — add streaming-body refusal tests +- `tests/test_error_mapping_terminal.py` — no test changes; existing tests must still pass after the `_terminal` refactor +- `README.md` — add a streaming snippet to Quickstart +- `docs/index.md` — mirror the README addition +- `planning/engineering.md` — §1 append streaming sentence; §8 mark `4-3` shipped + close Epic 4 +- `planning/deferred-work.md` — close the two now-resolved items + +**New files:** +- `tests/test_client_stream.py` — unit tests for `AsyncClient.stream()` +- `planning/releases/0.5.0.md` — release notes + +**Commit cadence:** one commit per task. Per-task commits keep history reviewable. + +--- + +## Task 1: Branch + extract `_httpx2_exception_mapper` and `_raise_on_status_error` helpers + +**Files:** +- Modify: `src/httpware/client.py` + +This is a pure refactor of `_terminal`. The dispatch behavior is byte-for-byte identical to today; the existing terminal tests (`tests/test_error_mapping_terminal.py`) cover it and must keep passing. Extracting both helpers up front means `stream()` (Task 4) can use them directly without duplication. + +- [ ] **Step 1: Create the branch** + +Run: +```bash +git checkout main && git pull && git checkout -b feat/v0.5-streaming +``` +Expected: switched to a new branch. + +- [ ] **Step 2: Read the current `_terminal` body** + +```bash +sed -n '107,130p' src/httpware/client.py +``` +Confirm `_terminal` has the structure: +- try/except chain mapping `httpx2.TimeoutException` → `TimeoutError`, `(httpx2.InvalidURL, httpx2.CookieConflict)` → `TransportError`, `httpx2.NetworkError` → `NetworkError`, `httpx2.HTTPError` → `TransportError`, plus a `RuntimeError "closed"` check +- status-code block raising `STATUS_TO_EXCEPTION.get(status, ClientStatusError if status < 500 else ServerStatusError)(response)` for 4xx/5xx + +- [ ] **Step 3: Add `contextlib` import** + +Add to the existing import block (around line 3, alongside `typing`): +```python +import contextlib +``` + +Also add `AsyncIterator` to the imports from `collections.abc`. If `collections.abc` is not yet imported (current code uses `Sequence`), make sure both are present: + +```python +from collections.abc import AsyncIterator, Sequence +``` + +- [ ] **Step 4: Add the `_httpx2_exception_mapper` helper at module level** + +Insert this `@asynccontextmanager` function immediately before `class AsyncClient:` (i.e., between `_default_pydantic_decoder` and the class definition): + +```python +@contextlib.asynccontextmanager +async def _httpx2_exception_mapper() -> AsyncIterator[None]: + """Map httpx2 exceptions to httpware exceptions. Shared by AsyncClient._terminal and stream().""" + try: + yield + except httpx2.TimeoutException as exc: + raise TimeoutError(str(exc)) from exc + except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: + raise TransportError(str(exc)) from exc + except httpx2.NetworkError as exc: + raise NetworkError(str(exc)) from exc + except httpx2.HTTPError as exc: + raise TransportError(str(exc)) from exc +``` + +Clause ordering MUST match the current `_terminal` exactly: TimeoutException → InvalidURL/CookieConflict → NetworkError → HTTPError. `RuntimeError "closed"` is NOT included here — it's not an httpx2 error and stays inline in `_terminal`. + +- [ ] **Step 5: Add the `_raise_on_status_error` helper at module level** + +Insert this function immediately after `_httpx2_exception_mapper`: + +```python +def _raise_on_status_error(response: httpx2.Response) -> None: + """Raise the appropriate StatusError subclass for a 4xx/5xx response. No-op for 2xx/3xx.""" + status = response.status_code + if HTTPStatus.BAD_REQUEST <= status < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx + exc_class = STATUS_TO_EXCEPTION.get( + status, + ClientStatusError if status < HTTPStatus.INTERNAL_SERVER_ERROR else ServerStatusError, + ) + raise exc_class(response) +``` + +- [ ] **Step 6: Refactor `_terminal` to use both helpers** + +Replace the current `_terminal` body (which is the `try`/`except` chain + the status-code block, ~24 lines) with: + +```python +async def _terminal(self, request: httpx2.Request) -> httpx2.Response: + try: + async with _httpx2_exception_mapper(): + response = await self._httpx2_client.send(request) + except RuntimeError as exc: + if "closed" in str(exc): + raise TransportError(str(exc)) from exc + raise + _raise_on_status_error(response) + return response +``` + +The `RuntimeError` check stays in `_terminal` (the mapper doesn't cover it — it's a non-httpx2 error specific to the closed-client edge case). + +- [ ] **Step 7: Run the existing terminal tests** + +```bash +uv run pytest tests/test_error_mapping_terminal.py -v +``` +Expected: all PASS. The refactor must not change observable behavior. + +- [ ] **Step 8: Run lint + full suite** + +```bash +just lint && just test +``` +Expected: clean, 100% coverage maintained (was 209 tests; still 209). + +- [ ] **Step 9: Stage and commit** + +```bash +git add src/httpware/client.py +git commit -m "refactor(client): extract _httpx2_exception_mapper + _raise_on_status_error + +Pure refactor of _terminal. Two module-level helpers (one @asynccontextmanager +for the httpx2 exception dispatch, one function for the 4xx/5xx StatusError +raise). _terminal now reads as: enter the mapper, send, raise on status. + +Sets up Task 4: AsyncClient.stream() will reuse both helpers verbatim +instead of duplicating the dispatch logic. Behavior is byte-for-byte +identical to today; the existing terminal tests cover it." +``` + +--- + +## Task 2: Add `_is_streaming_body` helper + `_request_with_body` streaming-body marker + +**Files:** +- Modify: `src/httpware/client.py` +- Modify: `tests/test_retry.py` (or `tests/test_client_construction.py` — pick whichever currently holds AsyncClient/marker-related tests; the spec puts them in test_retry.py since they're part of the Retry-refusal story) + +The marker is the detection mechanism Retry will use in Task 3. + +- [ ] **Step 1: Write failing tests in `tests/test_retry.py`** + +Append to `tests/test_retry.py`: + +```python +async def test_client_post_with_async_iterable_content_marks_extensions() -> None: + """Posting with an async-iterable body sets the httpware.streaming_body marker on request.extensions.""" + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"chunk1" + yield b"chunk2" + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", content=streamed_body()) + + assert len(seen_extensions) == 1 + assert seen_extensions[0].get("httpware.streaming_body") is True + + +async def test_client_post_with_bytes_content_does_not_mark_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", content=b"hi") + + assert len(seen_extensions) == 1 + assert "httpware.streaming_body" not in seen_extensions[0] + + +async def test_client_post_with_dict_data_does_not_mark_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", data={"k": "v"}) + + assert len(seen_extensions) == 1 + assert "httpware.streaming_body" not in seen_extensions[0] + + +async def test_client_post_with_async_iterable_data_marks_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + async def streamed_data() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", data=streamed_data()) + + assert len(seen_extensions) == 1 + assert seen_extensions[0].get("httpware.streaming_body") is True + + +async def test_client_post_with_async_iterable_files_marks_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + async def streamed_files() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", files=streamed_files()) + + assert len(seen_extensions) == 1 + assert seen_extensions[0].get("httpware.streaming_body") is True +``` + +`typing` is already imported at the top of `tests/test_retry.py`; if not, add `import typing` to the top. + +Run: `uv run pytest tests/test_retry.py -v -k "marks_extensions or does_not_mark_extensions"` +Expected: all 5 FAIL — the marker isn't set yet. + +- [ ] **Step 2: Add `_is_streaming_body` helper at module level** + +In `src/httpware/client.py`, immediately after `_raise_on_status_error` (added in Task 1), insert: + +```python +def _is_streaming_body(value: typing.Any) -> bool: + """True if value is an async-iterable that cannot be safely replayed for retry.""" + if value is None: + return False + if isinstance(value, (bytes, bytearray, memoryview, str, dict)): + return False + return hasattr(value, "__aiter__") +``` + +- [ ] **Step 3: Set the streaming-body marker in `_request_with_body`** + +Locate `_request_with_body` (around `client.py:153`). Find the line: +```python +request = self._httpx2_client.build_request(method, url, **kwargs) +return await self.send(request, response_model=response_model) +``` + +Replace with: +```python +request = self._httpx2_client.build_request(method, url, **kwargs) +if _is_streaming_body(content) or _is_streaming_body(data) or _is_streaming_body(files): + request.extensions["httpware.streaming_body"] = True +return await self.send(request, response_model=response_model) +``` + +NOTE: `httpx2.Request.extensions` is a `dict[str, Any]` per httpx convention. If during implementation you find it can be `None` on a freshly-built request, swap to: +```python +extensions_dict = dict(request.extensions or {}) +extensions_dict["httpware.streaming_body"] = True +request.extensions = extensions_dict +``` + +- [ ] **Step 4: Run the new tests** + +```bash +uv run pytest tests/test_retry.py -v -k "marks_extensions or does_not_mark_extensions" +``` +Expected: all 5 PASS. + +- [ ] **Step 5: Lint + full suite** + +```bash +just lint && just test +``` +Expected: clean, 100% coverage. + +- [ ] **Step 6: Stage and commit** + +```bash +git add src/httpware/client.py tests/test_retry.py +git commit -m "feat(client): mark requests with async-iterable bodies via extensions + +Adds a _is_streaming_body helper and a marker step in _request_with_body: +when content / data / files is an async-iterable, set +request.extensions['httpware.streaming_body'] = True before sending. + +Sets up Task 3: Retry will read the marker and refuse to retry streamed-body +requests (they can't replay across attempts). Today the marker has no +consumer; it's harmless metadata." +``` + +--- + +## Task 3: `Retry` refuses streamed-body requests + +**Files:** +- Modify: `src/httpware/middleware/resilience/retry.py` +- Modify: `tests/test_retry.py` + +- [ ] **Step 1: Write failing tests in `tests/test_retry.py`** + +Append: + +```python +async def test_retry_refuses_streamed_body_request() -> None: + """Retry must not replay a request with a streaming body — re-raise with a PEP-678 note.""" + sleeper = _SleepRecorder() + call_count = {"n": 0} + + def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(ServiceUnavailableError) as info: + await client.post("https://example.test/upload", content=streamed_body()) + + assert call_count["n"] == 1 + assert sleeper.calls == [] # no retry attempted + notes = getattr(info.value, "__notes__", []) + assert any("not retrying" in note and "stream" in note for note in notes) + + +async def test_retry_refuses_streamed_body_does_not_consume_budget() -> None: + """When Retry refuses for streaming-body reasons, no budget token is withdrawn.""" + sleeper = _SleepRecorder() + budget = RetryBudget(ttl=10.0, min_retries_per_sec=10.0, percent_can_retry=0.2) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, budget=budget, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(ServiceUnavailableError): + await client.post("https://example.test/upload", content=streamed_body()) + + # Budget should be untouched: deposits OK (every attempt deposits), but no withdrawals. + # Check via _withdrawn deque emptiness. + assert len(budget._withdrawn) == 0 # noqa: SLF001 — implementation-detail access for invariant +``` + +`Retry`, `RetryBudget`, `_SleepRecorder`, `ServiceUnavailableError`, and `pytest` should already be imported in `tests/test_retry.py`; verify and add any missing imports at the top of the file. + +Run: `uv run pytest tests/test_retry.py -v -k "refuses_streamed_body"` +Expected: FAIL — both attempts will retry (Retry doesn't yet check the marker). + +- [ ] **Step 2: Add the streamed-body check to `Retry.__call__`** + +Read `src/httpware/middleware/resilience/retry.py`. Find the retryable-failure path — specifically, the block just after the `except` clauses and before the `if not self.budget.try_withdraw():` check. + +Insert the refusal block immediately after `# ---- retryable failure path` and BEFORE the `if is_last:` check: + +```python + # ---- retryable failure path + if request.extensions.get("httpware.streaming_body"): + if last_exc is None: # pragma: no cover — invariant from except branch + msg = "Retry: streaming-body refusal reached with no last_exc" + raise AssertionError(msg) + last_exc.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) + raise last_exc + + if is_last: + ... +``` + +The streaming-body check comes FIRST (before is_last + before budget.try_withdraw()) so we don't consume a budget token for a request we won't retry. The PEP-678 note follows the same `add_note` pattern as the max-attempts exhaustion path. + +- [ ] **Step 3: Run the new tests** + +```bash +uv run pytest tests/test_retry.py -v -k "refuses_streamed_body" +``` +Expected: both PASS. + +- [ ] **Step 4: Lint + full suite** + +```bash +just lint && just test +``` +Expected: clean, 100% coverage. + +- [ ] **Step 5: Stage and commit** + +```bash +git add src/httpware/middleware/resilience/retry.py tests/test_retry.py +git commit -m "feat(resilience): Retry refuses requests with streaming bodies + +Closes the deferred-work item 'Retry + streaming bodies'. When the +request was constructed with an async-iterable content/data/files, +_request_with_body marked request.extensions['httpware.streaming_body'] += True. Retry now reads the marker and re-raises the original failure +with a PEP-678 note ('not retrying — request body is a stream that +cannot replay across attempts') instead of retrying with a consumed +iterator. + +Check happens BEFORE budget.try_withdraw() so a refused retry doesn't +consume a budget token." +``` + +--- + +## Task 4: Add `AsyncClient.stream()` context-manager method + +**Files:** +- Modify: `src/httpware/client.py` +- Create: `tests/test_client_stream.py` + +This is the largest task — the method body, its tests, the integration with the helpers from Task 1. + +- [ ] **Step 1: Write failing tests in `tests/test_client_stream.py`** + +Create `tests/test_client_stream.py`: + +```python +"""Tests for AsyncClient.stream() context manager.""" + +import asyncio +import typing +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import ( + AsyncClient, + ClientStatusError, + NetworkError, + NotFoundError, + ServerStatusError, + ServiceUnavailableError, + TimeoutError as HttpwareTimeoutError, # noqa: A004 + TransportError, +) +from httpware.middleware import Middleware, Next + + +_UNKNOWN_4XX = 418 # I'm a teapot +_UNKNOWN_5XX = 599 +_REDIRECT_3XX = 301 +_NOT_FOUND = 404 +_SERVICE_UNAVAILABLE = 503 + + +def _client(handler: typing.Callable[[httpx2.Request], httpx2.Response]) -> AsyncClient: + transport = httpx2.MockTransport(handler) + return AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + + +async def test_streams_response_body_successfully() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"chunk1chunk2chunk3") + + client = _client(handler) + chunks: list[bytes] = [] + async with client.stream("GET", "https://example.test/x") as response: + assert response.status_code == HTTPStatus.OK + async for chunk in response.aiter_bytes(): + chunks.append(chunk) + assert b"".join(chunks) == b"chunk1chunk2chunk3" + + +async def test_auto_raises_on_4xx_with_body_preread() -> None: + body = b'{"error": "not found"}' + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_NOT_FOUND, request=request, content=body) + + client = _client(handler) + with pytest.raises(NotFoundError) as info: + async with client.stream("GET", "https://example.test/missing"): + pytest.fail("should have raised before reaching block body") # pragma: no cover + assert info.value.response.status_code == _NOT_FOUND + assert info.value.response.content == body # body was pre-read; accessible + + +async def test_auto_raises_on_5xx_with_body_preread() -> None: + body = b"degraded" + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_SERVICE_UNAVAILABLE, request=request, content=body) + + client = _client(handler) + with pytest.raises(ServiceUnavailableError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert info.value.response.content == body + + +async def test_auto_raises_unknown_4xx_falls_back_to_client_status_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_UNKNOWN_4XX, request=request) + + client = _client(handler) + with pytest.raises(ClientStatusError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert type(info.value) is ClientStatusError + assert info.value.response.status_code == _UNKNOWN_4XX + + +async def test_auto_raises_unknown_5xx_falls_back_to_server_status_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_UNKNOWN_5XX, request=request) + + client = _client(handler) + with pytest.raises(ServerStatusError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert type(info.value) is ServerStatusError + assert info.value.response.status_code == _UNKNOWN_5XX + + +async def test_3xx_does_not_raise() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_REDIRECT_3XX, request=request, headers={"location": "/y"}) + + client = _client(handler) + async with client.stream("GET", "https://example.test/x") as response: + assert response.status_code == _REDIRECT_3XX + + +async def test_network_error_during_request_maps_to_network_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect refused" + raise httpx2.ConnectError(msg) + + client = _client(handler) + with pytest.raises(NetworkError, match="connect refused"): + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + + +async def test_network_error_during_body_consumption_maps_to_network_error() -> None: + async def streaming_body() -> typing.AsyncIterator[bytes]: + yield b"first chunk" + msg = "read failed mid-stream" + raise httpx2.ReadError(msg) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=streaming_body()) + + client = _client(handler) + with pytest.raises(NetworkError, match="read failed mid-stream"): + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + +async def test_timeout_during_stream_maps_to_httpware_timeout() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "read timeout" + raise httpx2.ReadTimeout(msg) + + client = _client(handler) + with pytest.raises(HttpwareTimeoutError, match="read timeout"): + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + + +async def test_invalid_url_maps_to_bare_transport_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "bad url" + raise httpx2.InvalidURL(msg) + + client = _client(handler) + with pytest.raises(TransportError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert not isinstance(info.value, NetworkError) + + +async def test_cancellation_propagates_cleanly() -> None: + async def slow_body() -> typing.AsyncIterator[bytes]: + yield b"first" + await asyncio.sleep(1.0) + yield b"second" # pragma: no cover + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=slow_body()) + + client = _client(handler) + + async def consume() -> None: + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + task = asyncio.create_task(consume()) + await asyncio.sleep(0.01) # let body consumption begin + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +async def test_user_exception_in_block_propagates_unchanged() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"data") + + client = _client(handler) + with pytest.raises(ValueError, match="user explosion"): + async with client.stream("GET", "https://example.test/x"): + msg = "user explosion" + raise ValueError(msg) + + +async def test_bypasses_middleware_chain() -> None: + """stream() must not invoke any middleware in the chain.""" + invocations = {"n": 0} + + class _RecordingMiddleware: + async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 + invocations["n"] += 1 + return await next(request) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"x") + + transport = httpx2.MockTransport(handler) + middleware: Middleware = _RecordingMiddleware() + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[middleware], + ) + + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + assert invocations["n"] == 0 + + +async def test_forwards_kwargs_to_httpx2() -> None: + seen: list[httpx2.Request] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + client = _client(handler) + async with client.stream( + "GET", + "https://example.test/x", + params={"q": "value"}, + headers={"X-Custom": "1"}, + cookies={"sid": "abc"}, + ) as response: + async for _ in response.aiter_bytes(): + pass + + request = seen[0] + assert request.url.params["q"] == "value" + assert request.headers["x-custom"] == "1" + assert request.headers["cookie"] == "sid=abc" + + +async def test_stream_with_content_kwarg() -> None: + seen: list[bytes] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request.content) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + client = _client(handler) + async with client.stream("POST", "https://example.test/upload", content=b"payload") as response: + async for _ in response.aiter_bytes(): + pass + + assert seen[0] == b"payload" + + +async def test_stream_with_async_iterable_content() -> None: + """stream() bypass means async-iterable bodies work without the streaming-body marker mechanism.""" + seen_calls: list[int] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_calls.append(1) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"chunk1" + yield b"chunk2" + + client = _client(handler) + async with client.stream("POST", "https://example.test/upload", content=streamed_body()) as response: + async for _ in response.aiter_bytes(): + pass + + assert seen_calls == [1] +``` + +Run: `uv run pytest tests/test_client_stream.py -v` +Expected: all FAIL with `AttributeError: 'AsyncClient' object has no attribute 'stream'`. + +- [ ] **Step 2: Implement `AsyncClient.stream()`** + +Add this method to `AsyncClient` in `src/httpware/client.py`. Place it AFTER the existing `request()` method (the last per-method definition) and BEFORE `__aenter__`: + +```python +@contextlib.asynccontextmanager +async def stream( # noqa: PLR0913 — mirrors httpx2 per-method signatures + self, + method: str, + url: str, + *, + params: typing.Any | None = None, + headers: typing.Any | None = None, + cookies: typing.Any | None = None, + timeout: typing.Any = httpx2.USE_CLIENT_DEFAULT, + extensions: typing.Any | None = None, + json: typing.Any | None = None, + content: typing.Any | None = None, + data: typing.Any | None = None, + files: typing.Any | None = None, +) -> AsyncIterator[httpx2.Response]: + """Stream an HTTP response. Bypasses the middleware chain. + + Yields an httpx2.Response; consume the body via response.aiter_bytes(), + response.aiter_text(), response.aiter_lines(), or response.aiter_raw(). + The body is NOT pre-read for 2xx/3xx (streaming preserved); the response + is closed when the context exits. + + Bypasses the middleware chain (no Retry, no Bulkhead, no user-installed + middleware) for v1 — see planning/specs/2026-06-05-streaming-design.md. + + Auto-raises StatusError subclasses on 4xx/5xx (NotFoundError, + ServiceUnavailableError, etc.) — consistent with client.get()/post()/etc. + On error the response body is pre-read so exc.response.content is + accessible. You lose the streaming property on errors; rare in practice. + + Maps httpx2 exceptions raised during the request OR body consumption to + httpware exceptions via _httpx2_exception_mapper. + """ + kwargs: dict[str, typing.Any] = {} + if params is not None: + kwargs["params"] = params + if headers is not None: + kwargs["headers"] = headers + if cookies is not None: + kwargs["cookies"] = cookies + if timeout is not httpx2.USE_CLIENT_DEFAULT: + kwargs["timeout"] = timeout + if extensions is not None: + kwargs["extensions"] = extensions + if json is not None: + kwargs["json"] = json + if content is not None: + kwargs["content"] = content + if data is not None: + kwargs["data"] = data + if files is not None: + kwargs["files"] = files + + async with _httpx2_exception_mapper(): + async with self._httpx2_client.stream(method, url, **kwargs) as response: + if HTTPStatus.BAD_REQUEST <= response.status_code < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx + await response.aread() # pre-read body so exc.response.content works + _raise_on_status_error(response) + yield response +``` + +Note: `_raise_on_status_error` actually doesn't need the wrapping `if` check because it's a no-op for non-error status; you could simplify. But the explicit `if` here makes it obvious that `aread()` only runs on errors. Keep both. Alternatively if simpler: + +```python +if HTTPStatus.BAD_REQUEST <= response.status_code < 600: + await response.aread() +_raise_on_status_error(response) # raises iff 4xx/5xx +yield response +``` + +Either form is fine. The explicit version makes the "only pre-read on error" intent obvious; pick that one. + +- [ ] **Step 3: Run the stream tests** + +```bash +uv run pytest tests/test_client_stream.py -v +``` +Expected: all PASS. + +- [ ] **Step 4: Lint + full suite** + +```bash +just lint && just test +``` +Expected: clean, 100% coverage. + +- [ ] **Step 5: Stage and commit** + +```bash +git add src/httpware/client.py tests/test_client_stream.py +git commit -m "feat(client): AsyncClient.stream() context manager + +Adds AsyncClient.stream(method, url, **kwargs) as a +@contextlib.asynccontextmanager method on the client. Mirrors +httpx2.AsyncClient.stream() but auto-raises StatusError subclasses +on 4xx/5xx (consistent with client.get/post/etc.) with body +pre-read so exc.response.content is accessible. + +Bypasses the middleware chain (v1 design decision — revisit if user +feedback warrants). Uses the shared _httpx2_exception_mapper and +_raise_on_status_error helpers extracted in the earlier refactor +commit, so dispatch logic stays in lockstep with _terminal. + +Body consumption errors during 'async for chunk in response.aiter_bytes()' +propagate through the yield and get mapped to httpware exceptions +consistently." +``` + +--- + +## Task 5: Documentation + release notes + +**Files:** +- Modify: `README.md` +- Modify: `docs/index.md` +- Modify: `planning/engineering.md` +- Modify: `planning/deferred-work.md` +- Create: `planning/releases/0.5.0.md` + +- [ ] **Step 1: Add streaming snippet to README.md** + +After the existing "With resilience middleware" subsection and BEFORE the `## Errors` section, insert a new `### Streaming responses` subsection: + +```markdown + +### Streaming responses + +For large responses or server-sent events, stream the body chunk-by-chunk. `stream()` is an async context manager: + +```python +from httpware import AsyncClient + + +async def main() -> None: + async with AsyncClient(base_url="https://api.example.com") as client: + async with client.stream("GET", "/big-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +`stream()` auto-raises `StatusError` subclasses on 4xx/5xx with the response body pre-read, so `exc.response.content` is accessible from the caught exception. + +It does NOT pass through the middleware chain: `Retry`, `Bulkhead`, and any custom middleware are bypassed. (Retry separately refuses to retry any request — stream or non-stream — whose body was an async-iterable, since streams can't replay across attempts.) +``` + +- [ ] **Step 2: Mirror the addition in `docs/index.md`** + +Same content added at the matching position (after the "With resilience middleware" subsection, before the `## Errors` section). Keep the wording verbatim so the README and docs/index.md stay in sync. + +- [ ] **Step 3: Update `planning/engineering.md`** + +In §1 (Project intent), append one sentence to the first paragraph (after the resilience-suite sentence added in the 0.4 docs sync): + +``` + As of 0.5.0, `AsyncClient.stream()` provides a context-manager API for chunked response bodies; it bypasses the middleware chain by design (see planning/specs/2026-06-05-streaming-design.md). +``` + +In §8 (Remaining roadmap), find the Epic 4 entry: +``` +- **Epic 4 — Streaming:** `4-3` `AsyncClient.stream` context manager (forwards to `httpx2.AsyncClient.stream`; no `StreamResponse` type). +``` +Replace with: +``` +- **Epic 4 — Streaming:** SHIPPED in v0.5 (PR #...): `AsyncClient.stream()` context manager + Retry refuses streamed-body requests. See [`planning/specs/2026-06-05-streaming-design.md`](specs/2026-06-05-streaming-design.md) and [`planning/plans/2026-06-05-streaming-plan.md`](plans/2026-06-05-streaming-plan.md). +``` + +(Use the actual PR number once the PR is opened — leave a `#…` placeholder if the PR doesn't exist yet; the implementer fills in during finishing-a-development-branch.) + +- [ ] **Step 4: Close the two deferred-work items** + +Edit `planning/deferred-work.md`. The two items to close: + +1. Under `## Open` → `### Retry + streaming bodies (Epic 4 interaction)`: remove this entire entry. + +2. Under `## Closed by the v0.2 thin-wrapper pivot (2026-06-03)`: the line `- httpx2.StreamError family escape from the transport's except httpx2.HTTPError (mapping logic relocated to AsyncClient's terminal; revisit with Epic 4 streaming work).` — replace the trailing parenthetical with `; closed by 0.5.0 streaming work — exception mapping in _httpx2_exception_mapper covers the StreamError family via httpx2.NetworkError).` + +Then add a NEW section above the "Closed by the v0.2 thin-wrapper pivot" one: + +```markdown +## Closed by the 0.5.0 streaming release (2026-06-05) + +- **`Retry` refuses streamed-body requests.** When `_request_with_body` is called with an async-iterable `content`/`data`/`files`, the request gets `extensions["httpware.streaming_body"] = True`. `Retry.__call__` reads the marker and re-raises with a PEP-678 note on retryable failures instead of retrying with a consumed iterator. Closes the prior Open entry. +- **`httpx2.StreamError` family escape closed.** The new shared `_httpx2_exception_mapper` catches `httpx2.NetworkError` (which is the parent of `ReadError` / `WriteError` / `CloseError`), so stream-specific exceptions raised during body consumption now map to `httpware.NetworkError` consistently. +``` + +- [ ] **Step 5: Create `planning/releases/0.5.0.md`** + +```markdown +# httpware 0.5.0 — Streaming responses + +**0.5.0 is additive. No breaking changes.** Code written against 0.4.0 continues to work unchanged. + +This release closes Epic 4 by adding `AsyncClient.stream()` for chunked response bodies, and closes two longstanding deferred-work items along the way. + +## New features + +- **`AsyncClient.stream(method, url, **kwargs)`** — async context manager that yields an `httpx2.Response` with a non-pre-read body. Consume via `response.aiter_bytes()`, `response.aiter_text()`, `response.aiter_lines()`, or `response.aiter_raw()`. Auto-raises `StatusError` subclasses on 4xx/5xx (with the body pre-read so `exc.response.content` works). Bypasses the middleware chain by design — `Retry`, `Bulkhead`, and user-installed middleware do not see `stream()` calls in v1. +- **`Retry` refuses streamed-body requests.** When you call `client.post(content=async_gen())` (or `data=`, `files=`), the request is marked via `request.extensions["httpware.streaming_body"]`. If `Retry` would otherwise retry on a failure, it re-raises the original exception with a PEP 678 note instead — preventing the "consumed iterator can't replay" footgun. + +## Backwards compatibility + +Subclassing/extensions preserve every existing catch-block: + +- All previously-shipping methods (`get`, `post`, etc.) behave identically. +- The internal refactor that extracted `_httpx2_exception_mapper` from `_terminal` is byte-for-byte equivalent in dispatch behavior. Tests prove this. +- The streaming-body marker (`request.extensions["httpware.streaming_body"]`) only affects requests that genuinely have async-iterable bodies. Existing code passing bytes / dict / files-as-bytes is unaffected. + +## Usage + +```python +from httpware import AsyncClient + + +async def main() -> None: + async with AsyncClient(base_url="https://api.example.com") as client: + async with client.stream("GET", "/big-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +Catch typed status errors on streams the same way as on regular calls: + +```python +from httpware import NotFoundError + +try: + async with client.stream("GET", "/maybe-missing") as response: + ... +except NotFoundError as exc: + body_text = exc.response.text # pre-read; accessible +``` + +## What's still ahead + +- Epic 5 (observability hooks + OTel middleware) is unstarted; logging of retry / bulkhead / stream decisions plumbs through then. +- Whether `stream()` should compose with the middleware chain is deferred to real-user feedback. Adding it later is purely additive (`stream(..., apply_middleware: bool = False)` opt-in). + +## References + +- Spec: [`planning/specs/2026-06-05-streaming-design.md`](../specs/2026-06-05-streaming-design.md) +- Plan: [`planning/plans/2026-06-05-streaming-plan.md`](../plans/2026-06-05-streaming-plan.md) +- Roadmap: [`planning/engineering.md`](../engineering.md) §8 +``` + +- [ ] **Step 6: Lint** + +```bash +just lint +``` +Expected: clean. (eof-fixer + ruff format may normalize the markdown.) + +- [ ] **Step 7: Stage and commit** + +```bash +git add README.md docs/index.md planning/engineering.md planning/deferred-work.md planning/releases/0.5.0.md +git commit -m "docs: 0.5.0 release notes + sync user docs with streaming work + +- README + docs/index.md: add 'Streaming responses' subsection +- planning/engineering.md §1 + §8: mention stream() in project intent; + mark Epic 4 SHIPPED in roadmap +- planning/deferred-work.md: close the 'Retry + streaming bodies' open + item and update the v0.2-pivot StreamError-escape entry; add a new + 'Closed by the 0.5.0 streaming release' section +- planning/releases/0.5.0.md: new release notes" +``` + +--- + +## Task 6: Final verification + push + +**Files:** none modified; verification only. + +- [ ] **Step 1: Full lint** + +```bash +just lint-ci +``` +Expected: clean. + +- [ ] **Step 2: Full test suite** + +```bash +just test +``` +Expected: ALL tests PASS, coverage = 100%. Test count should be 209 (current) + ~5 marker tests + 2 retry-refuse tests + ~16 stream tests = ~232. + +- [ ] **Step 3: Architecture invariants from `CLAUDE.md`** + +```bash +grep -rE 'httpx2\._' src/httpware/ || echo "PASS: no httpx2 private API" +grep -rE 'from __future__ import annotations' src/httpware/ || echo "PASS: no __future__ annotations" +grep -rE '\bprint\(' src/httpware/ || echo "PASS: no print()" +grep -rE 'logging\.(basicConfig|getLogger)\(\)' src/httpware/ || echo "PASS: no global logging" +grep -rE '# (type|mypy): ignore' src/httpware/ || echo "PASS: no type/mypy ignore" +``` +Each should print PASS. + +- [ ] **Step 4: Optional-extras isolation** + +`stream()` is pure stdlib — no new optional deps. +```bash +uv run pytest tests/test_optional_extras_isolation.py -v +``` +Expected: PASS. + +- [ ] **Step 5: mkdocs strict build** + +```bash +uv run --with mkdocs --with mkdocs-material mkdocs build --strict 2>&1 | tail -10 +rm -rf site/ +``` +Expected: 0 warnings. (Previous PR closed all nav-warnings; this one only adds content under `index.md`.) + +- [ ] **Step 6: Push the branch** + +```bash +git push -u origin feat/v0.5-streaming +``` + +DO NOT open the PR yet — leave that to `finishing-a-development-branch`. + +--- + +## Out of scope for this plan (per the spec) + +These items are deliberately deferred. Do NOT do them in this PR: + +- **`stream()` going through the middleware chain.** Stays bypassed for v1. Adding `apply_middleware: bool = False` opt-in later is purely additive. +- **`StreamResponse` wrapper type.** Explicit non-goal from the original story wording. +- **`response_model=` decoding parameter for stream().** Doesn't apply. +- **Bulkhead-during-stream integration.** Bulkhead doesn't see stream() calls. +- **Detection of streaming bodies on manually-constructed `httpx2.Request` objects.** Marker only set in `_request_with_body`; manual constructors accept responsibility. +- **Version bump in `pyproject.toml`.** Tag-driven release; bump not required (see prior pattern: 0.4.0 release notes shipped without a pyproject bump). From be919c21696350864a34b8b9d1e0b56b30b9d779 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:23:18 +0300 Subject: [PATCH 4/9] refactor(client): extract _httpx2_exception_mapper + _raise_on_status_error Pure refactor of _terminal. Two module-level helpers (one @asynccontextmanager for the httpx2 exception dispatch, one function for the 4xx/5xx StatusError raise). _terminal now reads as: enter the mapper, send, raise on status. Sets up Task 4: AsyncClient.stream() will reuse both helpers verbatim instead of duplicating the dispatch logic. Behavior is byte-for-byte identical to today; the existing terminal tests cover it. --- src/httpware/client.py | 48 +++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/httpware/client.py b/src/httpware/client.py index 1a355de..f4a98fe 100644 --- a/src/httpware/client.py +++ b/src/httpware/client.py @@ -1,7 +1,8 @@ """AsyncClient — the thin httpx2 wrapper.""" +import contextlib import typing -from collections.abc import Sequence +from collections.abc import AsyncIterator, Sequence from http import HTTPStatus import httpx2 @@ -44,6 +45,32 @@ def _default_pydantic_decoder() -> ResponseDecoder: return PydanticDecoder() +@contextlib.asynccontextmanager +async def _httpx2_exception_mapper() -> AsyncIterator[None]: + """Map httpx2 exceptions to httpware exceptions. Shared by AsyncClient._terminal and stream().""" + try: + yield + except httpx2.TimeoutException as exc: + raise TimeoutError(str(exc)) from exc + except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: + raise TransportError(str(exc)) from exc + except httpx2.NetworkError as exc: + raise NetworkError(str(exc)) from exc + except httpx2.HTTPError as exc: + raise TransportError(str(exc)) from exc + + +def _raise_on_status_error(response: httpx2.Response) -> None: + """Raise the appropriate StatusError subclass for a 4xx/5xx response. No-op for 2xx/3xx.""" + status = response.status_code + if HTTPStatus.BAD_REQUEST <= status < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx + exc_class = STATUS_TO_EXCEPTION.get( + status, + ClientStatusError if status < HTTPStatus.INTERNAL_SERVER_ERROR else ServerStatusError, + ) + raise exc_class(response) + + class AsyncClient: """Async HTTP client: thin wrapper around httpx2 with typed decoding and middleware.""" @@ -106,26 +133,13 @@ def __init__( # noqa: PLR0913 — wide constructor is the cost of a single-call async def _terminal(self, request: httpx2.Request) -> httpx2.Response: try: - response = await self._httpx2_client.send(request) - except httpx2.TimeoutException as exc: - raise TimeoutError(str(exc)) from exc - except (httpx2.InvalidURL, httpx2.CookieConflict) as exc: - raise TransportError(str(exc)) from exc - except httpx2.NetworkError as exc: - raise NetworkError(str(exc)) from exc - except httpx2.HTTPError as exc: - raise TransportError(str(exc)) from exc + async with _httpx2_exception_mapper(): + response = await self._httpx2_client.send(request) except RuntimeError as exc: if "closed" in str(exc): raise TransportError(str(exc)) from exc raise - status = response.status_code - if HTTPStatus.BAD_REQUEST <= status < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx - exc_class = STATUS_TO_EXCEPTION.get( - status, - ClientStatusError if status < HTTPStatus.INTERNAL_SERVER_ERROR else ServerStatusError, - ) - raise exc_class(response) + _raise_on_status_error(response) return response @typing.overload From 1555586ddb69c6ea0c9b5656b00efbc31e3d07bd Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:28:59 +0300 Subject: [PATCH 5/9] feat(client): mark requests with async-iterable bodies via extensions Adds a _is_streaming_body helper and a marker step in _request_with_body: when content / data / files is an async-iterable, set request.extensions['httpware.streaming_body'] = True before sending. Sets up Task 3: Retry will read the marker and refuse to retry streamed-body requests (they can't replay across attempts). Today the marker has no consumer; it's harmless metadata. --- src/httpware/client.py | 13 ++++++- tests/test_retry.py | 79 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/src/httpware/client.py b/src/httpware/client.py index f4a98fe..24c8a1f 100644 --- a/src/httpware/client.py +++ b/src/httpware/client.py @@ -71,6 +71,15 @@ def _raise_on_status_error(response: httpx2.Response) -> None: raise exc_class(response) +def _is_streaming_body(value: typing.Any) -> bool: + """Return True if value is an async-iterable that cannot be safely replayed for retry.""" + if value is None: + return False + if isinstance(value, (bytes, bytearray, memoryview, str, dict)): + return False + return hasattr(value, "__aiter__") + + class AsyncClient: """Async HTTP client: thin wrapper around httpx2 with typed decoding and middleware.""" @@ -164,7 +173,7 @@ def build_request(self, method: str, url: str, **kwargs: typing.Any) -> httpx2.R """Delegate request construction to the wrapped httpx2.AsyncClient.""" return self._httpx2_client.build_request(method, url, **kwargs) - async def _request_with_body( # noqa: PLR0913 — mirrors httpx2 per-method signatures + async def _request_with_body( # noqa: PLR0913, C901 — mirrors httpx2 per-method signatures; kwargs-forwarding complexity is structural self, method: str, url: str, @@ -200,6 +209,8 @@ async def _request_with_body( # noqa: PLR0913 — mirrors httpx2 per-method sig if files is not None: kwargs["files"] = files request = self._httpx2_client.build_request(method, url, **kwargs) + if _is_streaming_body(content) or _is_streaming_body(data) or _is_streaming_body(files): + request.extensions["httpware.streaming_body"] = True return await self.send(request, response_model=response_model) @typing.overload diff --git a/tests/test_retry.py b/tests/test_retry.py index a1a2bb5..4ece1d8 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -7,6 +7,7 @@ import asyncio import datetime import email.utils +import typing from collections.abc import Callable from http import HTTPStatus @@ -14,6 +15,7 @@ import pytest from httpware import AsyncClient, NotFoundError, ServiceUnavailableError, TransportError +from httpware.client import _is_streaming_body from httpware.errors import NetworkError, RetryBudgetExhaustedError from httpware.errors import TimeoutError as HttpwareTimeoutError from httpware.middleware.resilience.budget import RetryBudget @@ -441,3 +443,80 @@ async def test_explicit_budget_shared_across_retry_instances() -> None: for _ in range(10): assert shared.try_withdraw() is True assert shared.try_withdraw() is False + + +async def test_client_post_with_async_iterable_content_marks_extensions() -> None: + """Posting with an async-iterable body sets the httpware.streaming_body marker on request.extensions.""" + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"chunk1" + yield b"chunk2" + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", content=streamed_body()) + + assert len(seen_extensions) == 1 + assert seen_extensions[0].get("httpware.streaming_body") is True + + +async def test_client_post_with_bytes_content_does_not_mark_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", content=b"hi") + + assert len(seen_extensions) == 1 + assert "httpware.streaming_body" not in seen_extensions[0] + + +async def test_client_post_with_dict_data_does_not_mark_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", data={"k": "v"}) + + assert len(seen_extensions) == 1 + assert "httpware.streaming_body" not in seen_extensions[0] + + +async def test_client_post_with_async_iterable_data_marks_extensions() -> None: + seen_extensions: list[dict[str, object]] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_extensions.append(dict(request.extensions)) + return httpx2.Response(HTTPStatus.OK, request=request) + + async def streamed_data() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + await client.post("https://example.test/upload", data=streamed_data()) + + assert len(seen_extensions) == 1 + assert seen_extensions[0].get("httpware.streaming_body") is True + + +def test_is_streaming_body_true_for_async_iterable_files() -> None: + """_is_streaming_body returns True for an async-iterable, covering the files= path.""" + + async def streamed_files() -> typing.AsyncIterator[bytes]: + yield b"x" # pragma: no cover + + assert _is_streaming_body(streamed_files()) is True From 98681e0fb9abb8fb1de1fbdcc857161eb484d643 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:35:34 +0300 Subject: [PATCH 6/9] feat(resilience): Retry refuses requests with streaming bodies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the deferred-work item 'Retry + streaming bodies'. When the request was constructed with an async-iterable content/data/files, _request_with_body marked request.extensions['httpware.streaming_body'] = True. Retry now reads the marker and re-raises the original failure with a PEP-678 note ('not retrying — request body is a stream that cannot replay across attempts') instead of retrying with a consumed iterator. Check happens BEFORE budget.try_withdraw() so a refused retry doesn't consume a budget token. Co-Authored-By: Claude Sonnet 4.6 --- src/httpware/middleware/resilience/retry.py | 26 +++- tests/test_retry.py | 130 ++++++++++++++++++++ 2 files changed, 154 insertions(+), 2 deletions(-) diff --git a/src/httpware/middleware/resilience/retry.py b/src/httpware/middleware/resilience/retry.py index 43ef1a2..abb5f64 100644 --- a/src/httpware/middleware/resilience/retry.py +++ b/src/httpware/middleware/resilience/retry.py @@ -90,7 +90,7 @@ def __init__( # noqa: PLR0913 — retry policy has many orthogonal knobs; a dat self.budget = budget if budget is not None else RetryBudget() self._sleep = _sleep - async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002, C901, PLR0912 — complexity budget: 3 error clauses + idempotency gate + budget gate + Retry-After branch + backoff + async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002, C901, PLR0912, PLR0915 — complexity budget: 3 error clauses + idempotency gate + streaming-body refusal + budget gate + Retry-After branch + backoff """Process a request through the retry loop. See module docstring.""" method_eligible = request.method.upper() in self.retry_methods last_exc: BaseException | None = None @@ -106,12 +106,21 @@ async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response else: return await next(request) except StatusError as exc: - if not method_eligible or exc.response.status_code not in self.retry_status_codes: + retryable_status = exc.response.status_code in self.retry_status_codes + if not method_eligible or not retryable_status: + if retryable_status and request.extensions.get("httpware.streaming_body"): + exc.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) raise last_exc = exc last_response = exc.response except (NetworkError, TimeoutError) as exc: if not method_eligible: + if request.extensions.get("httpware.streaming_body"): + exc.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) raise last_exc = exc last_response = None @@ -119,11 +128,24 @@ async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response wrapped = TimeoutError("attempt timed out") wrapped.__cause__ = exc # set now; the retry path (last_exc = wrapped) has no `from` clause if not method_eligible: + if request.extensions.get("httpware.streaming_body"): + wrapped.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) raise wrapped from exc last_exc = wrapped last_response = None # ---- retryable failure path + if request.extensions.get("httpware.streaming_body"): + if last_exc is None: # pragma: no cover — invariant from except branch + msg = "Retry: streaming-body refusal reached with no last_exc" + raise AssertionError(msg) + last_exc.add_note( + "httpware: not retrying — request body is a stream that cannot replay across attempts" + ) + raise last_exc + if is_last: if last_exc is None: # pragma: no cover — structural invariant from except branch msg = "Retry: last_exc unset on final attempt — unreachable" diff --git a/tests/test_retry.py b/tests/test_retry.py index 4ece1d8..e160a36 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -520,3 +520,133 @@ async def streamed_files() -> typing.AsyncIterator[bytes]: yield b"x" # pragma: no cover assert _is_streaming_body(streamed_files()) is True + + +async def test_retry_refuses_streamed_body_request() -> None: + """Retry must not replay a request with a streaming body — re-raise with a PEP-678 note.""" + sleeper = _SleepRecorder() + call_count = {"n": 0} + + def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(ServiceUnavailableError) as info: + await client.post("https://example.test/upload", content=streamed_body()) + + assert call_count["n"] == 1 + assert sleeper.calls == [] # no retry attempted + notes = getattr(info.value, "__notes__", []) + assert any("not retrying" in note and "stream" in note for note in notes) + + +async def test_retry_refuses_streamed_body_does_not_consume_budget() -> None: + """When Retry refuses for streaming-body reasons, no budget token is withdrawn.""" + sleeper = _SleepRecorder() + budget = RetryBudget(ttl=10.0, min_retries_per_sec=10.0, percent_can_retry=0.2) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, budget=budget, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(ServiceUnavailableError): + await client.post("https://example.test/upload", content=streamed_body()) + + # Budget should be untouched: deposits OK (every attempt deposits), but no withdrawals. + # Check via _withdrawn deque emptiness. + assert len(budget._withdrawn) == 0 # noqa: SLF001 — implementation-detail access for invariant + + +async def test_retry_refuses_streamed_body_network_error_non_idempotent() -> None: + """Streaming POST that hits a NetworkError gets the PEP-678 note.""" + sleeper = _SleepRecorder() + + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "transient" + raise httpx2.ConnectError(msg) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(NetworkError) as info: + await client.post("https://example.test/upload", content=streamed_body()) + + assert sleeper.calls == [] # no retry attempted + notes = getattr(info.value, "__notes__", []) + assert any("not retrying" in note and "stream" in note for note in notes) + + +async def test_retry_refuses_streamed_body_attempt_timeout_non_idempotent() -> None: + """Streaming POST that times out per attempt_timeout gets the PEP-678 note.""" + sleeper = _SleepRecorder() + + async def slow_handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + await asyncio.sleep(1.0) + msg = "should not reach" # pragma: no cover + raise AssertionError(msg) # pragma: no cover + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(slow_handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, attempt_timeout=0.05, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(HttpwareTimeoutError) as info: + await client.post("https://example.test/upload", content=streamed_body()) + + assert sleeper.calls == [] # no retry attempted + notes = getattr(info.value, "__notes__", []) + assert any("not retrying" in note and "stream" in note for note in notes) + + +async def test_retry_refuses_streamed_body_idempotent_method() -> None: + """Streaming GET that hits a retryable status gets the PEP-678 note instead of retrying.""" + sleeper = _SleepRecorder() + call_count = {"n": 0} + + def handler(request: httpx2.Request) -> httpx2.Response: + call_count["n"] += 1 + return httpx2.Response(HTTPStatus.SERVICE_UNAVAILABLE, request=request) + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"x" + + transport = httpx2.MockTransport(handler) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[Retry(_sleep=sleeper, base_delay=0.001, max_delay=0.002)], + ) + + with pytest.raises(ServiceUnavailableError) as info: + await client.put("https://example.test/data", content=streamed_body()) + + assert call_count["n"] == 1 + assert sleeper.calls == [] # no retry attempted + notes = getattr(info.value, "__notes__", []) + assert any("not retrying" in note and "stream" in note for note in notes) From ea5ec1dfdb7035297c2a970f93abddf206dba188 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:44:40 +0300 Subject: [PATCH 7/9] feat(client): AsyncClient.stream() context manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds AsyncClient.stream(method, url, **kwargs) as a @contextlib.asynccontextmanager method on the client. Mirrors httpx2.AsyncClient.stream() but auto-raises StatusError subclasses on 4xx/5xx (consistent with client.get/post/etc.) with body pre-read so exc.response.content is accessible. Bypasses the middleware chain (v1 design decision — revisit if user feedback warrants). Uses the shared _httpx2_exception_mapper and _raise_on_status_error helpers extracted in the earlier refactor commit, so dispatch logic stays in lockstep with _terminal. Body consumption errors during 'async for chunk in response.aiter_bytes()' propagate through the yield and get mapped to httpware exceptions consistently. Co-Authored-By: Claude Sonnet 4.6 --- src/httpware/client.py | 60 +++++++ tests/test_client_stream.py | 337 ++++++++++++++++++++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 tests/test_client_stream.py diff --git a/src/httpware/client.py b/src/httpware/client.py index 24c8a1f..2e7069a 100644 --- a/src/httpware/client.py +++ b/src/httpware/client.py @@ -688,6 +688,66 @@ async def request( # noqa: PLR0913 — mirrors httpx2 per-method signatures response_model=response_model, ) + @contextlib.asynccontextmanager + async def stream( # noqa: PLR0913, C901 — mirrors httpx2 per-method signatures; kwargs-forwarding complexity is structural + self, + method: str, + url: str, + *, + params: typing.Any | None = None, + headers: typing.Any | None = None, + cookies: typing.Any | None = None, + timeout: typing.Any = httpx2.USE_CLIENT_DEFAULT, + extensions: typing.Any | None = None, + json: typing.Any | None = None, + content: typing.Any | None = None, + data: typing.Any | None = None, + files: typing.Any | None = None, + ) -> AsyncIterator[httpx2.Response]: + """Stream an HTTP response. Bypasses the middleware chain. + + Yields an httpx2.Response; consume the body via response.aiter_bytes(), + response.aiter_text(), response.aiter_lines(), or response.aiter_raw(). + The body is NOT pre-read for 2xx/3xx (streaming preserved); the response + is closed when the context exits. + + Bypasses the middleware chain (no Retry, no Bulkhead, no user-installed + middleware) for v1 — see planning/specs/2026-06-05-streaming-design.md. + + Auto-raises StatusError subclasses on 4xx/5xx (NotFoundError, + ServiceUnavailableError, etc.) — consistent with client.get()/post()/etc. + On error the response body is pre-read so exc.response.content is + accessible. You lose the streaming property on errors; rare in practice. + + Maps httpx2 exceptions raised during the request OR body consumption to + httpware exceptions via _httpx2_exception_mapper. + """ + kwargs: dict[str, typing.Any] = {} + if params is not None: + kwargs["params"] = params + if headers is not None: + kwargs["headers"] = headers + if cookies is not None: + kwargs["cookies"] = cookies + if timeout is not httpx2.USE_CLIENT_DEFAULT: + kwargs["timeout"] = timeout + if extensions is not None: + kwargs["extensions"] = extensions + if json is not None: + kwargs["json"] = json + if content is not None: + kwargs["content"] = content + if data is not None: + kwargs["data"] = data + if files is not None: + kwargs["files"] = files + + async with _httpx2_exception_mapper(), self._httpx2_client.stream(method, url, **kwargs) as response: + if HTTPStatus.BAD_REQUEST <= response.status_code < 600: # noqa: PLR2004 — 600 is the synthetic upper bound for 5xx + await response.aread() # pre-read body so exc.response.content works + _raise_on_status_error(response) + yield response + async def __aenter__(self) -> typing.Self: """Enter the async context manager; return self.""" return self diff --git a/tests/test_client_stream.py b/tests/test_client_stream.py new file mode 100644 index 0000000..9543f48 --- /dev/null +++ b/tests/test_client_stream.py @@ -0,0 +1,337 @@ +"""Tests for AsyncClient.stream() context manager.""" + +import asyncio +import typing +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import ( + AsyncClient, + ClientStatusError, + NetworkError, + NotFoundError, + ServerStatusError, + ServiceUnavailableError, + TransportError, +) +from httpware import ( + TimeoutError as HttpwareTimeoutError, +) +from httpware.middleware import Middleware, Next + + +_UNKNOWN_4XX = 418 # I'm a teapot +_UNKNOWN_5XX = 599 +_REDIRECT_3XX = 301 +_NOT_FOUND = 404 +_SERVICE_UNAVAILABLE = 503 + + +def _client(handler: typing.Callable[[httpx2.Request], httpx2.Response]) -> AsyncClient: + transport = httpx2.MockTransport(handler) + return AsyncClient(httpx2_client=httpx2.AsyncClient(transport=transport)) + + +async def test_streams_response_body_successfully() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"chunk1chunk2chunk3") + + client = _client(handler) + async with client.stream("GET", "https://example.test/x") as response: + assert response.status_code == HTTPStatus.OK + chunks = [chunk async for chunk in response.aiter_bytes()] + assert b"".join(chunks) == b"chunk1chunk2chunk3" + + +async def test_auto_raises_on_4xx_with_body_preread() -> None: + body = b'{"error": "not found"}' + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_NOT_FOUND, request=request, content=body) + + client = _client(handler) + with pytest.raises(NotFoundError) as info: + async with client.stream("GET", "https://example.test/missing"): + pytest.fail("should have raised before reaching block body") # pragma: no cover + assert info.value.response.status_code == _NOT_FOUND + assert info.value.response.content == body # body was pre-read; accessible + + +async def test_auto_raises_on_5xx_with_body_preread() -> None: + body = b"degraded" + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_SERVICE_UNAVAILABLE, request=request, content=body) + + client = _client(handler) + with pytest.raises(ServiceUnavailableError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert info.value.response.content == body + + +async def test_auto_raises_unknown_4xx_falls_back_to_client_status_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_UNKNOWN_4XX, request=request) + + client = _client(handler) + with pytest.raises(ClientStatusError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert type(info.value) is ClientStatusError + assert info.value.response.status_code == _UNKNOWN_4XX + + +async def test_auto_raises_unknown_5xx_falls_back_to_server_status_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_UNKNOWN_5XX, request=request) + + client = _client(handler) + with pytest.raises(ServerStatusError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert type(info.value) is ServerStatusError + assert info.value.response.status_code == _UNKNOWN_5XX + + +async def test_3xx_does_not_raise() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(_REDIRECT_3XX, request=request, headers={"location": "/y"}) + + client = _client(handler) + async with client.stream("GET", "https://example.test/x") as response: + assert response.status_code == _REDIRECT_3XX + + +async def test_network_error_during_request_maps_to_network_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect refused" + raise httpx2.ConnectError(msg) + + client = _client(handler) + with pytest.raises(NetworkError, match="connect refused"): + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + + +async def test_network_error_during_body_consumption_maps_to_network_error() -> None: + async def streaming_body() -> typing.AsyncIterator[bytes]: + yield b"first chunk" + msg = "read failed mid-stream" + raise httpx2.ReadError(msg) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=streaming_body()) + + client = _client(handler) + + async def consume() -> None: + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + with pytest.raises(NetworkError, match="read failed mid-stream"): + await consume() + + +async def test_timeout_during_stream_maps_to_httpware_timeout() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "read timeout" + raise httpx2.ReadTimeout(msg) + + client = _client(handler) + with pytest.raises(HttpwareTimeoutError, match="read timeout"): + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + + +async def test_invalid_url_maps_to_bare_transport_error() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "bad url" + raise httpx2.InvalidURL(msg) + + client = _client(handler) + with pytest.raises(TransportError) as info: + async with client.stream("GET", "https://example.test/x"): + pytest.fail("unreachable") # pragma: no cover + assert not isinstance(info.value, NetworkError) + + +async def test_cancellation_propagates_cleanly() -> None: + async def slow_body() -> typing.AsyncIterator[bytes]: + yield b"first" + await asyncio.sleep(1.0) + yield b"second" # pragma: no cover + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=slow_body()) + + client = _client(handler) + + async def consume() -> None: + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + task = asyncio.create_task(consume()) + await asyncio.sleep(0.01) # let body consumption begin + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +async def test_user_exception_in_block_propagates_unchanged() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"data") + + client = _client(handler) + + async def trigger() -> None: + async with client.stream("GET", "https://example.test/x"): + msg = "user explosion" + raise ValueError(msg) + + with pytest.raises(ValueError, match="user explosion"): + await trigger() + + +async def test_bypasses_middleware_chain() -> None: + """stream() must not invoke any middleware in the chain.""" + invocations = {"n": 0} + + class _RecordingMiddleware: + async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 # pragma: no cover + invocations["n"] += 1 + return await next(request) + + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"x") + + transport = httpx2.MockTransport(handler) + middleware: Middleware = _RecordingMiddleware() + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=transport), + middleware=[middleware], + ) + + async with client.stream("GET", "https://example.test/x") as response: + async for _ in response.aiter_bytes(): + pass + + assert invocations["n"] == 0 + + +async def test_forwards_kwargs_to_httpx2() -> None: + seen: list[httpx2.Request] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + client = _client(handler) + async with client.stream( + "GET", + "https://example.test/x", + params={"q": "value"}, + headers={"X-Custom": "1"}, + cookies={"sid": "abc"}, + ) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + + request = seen[0] + assert request.url.params["q"] == "value" + assert request.headers["x-custom"] == "1" + assert request.headers["cookie"] == "sid=abc" + + +async def test_stream_with_content_kwarg() -> None: + seen: list[bytes] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request.content) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + client = _client(handler) + async with client.stream("POST", "https://example.test/upload", content=b"payload") as response: + _ = [chunk async for chunk in response.aiter_bytes()] + + assert seen[0] == b"payload" + + +async def test_stream_with_async_iterable_content() -> None: + """stream() bypass means async-iterable bodies work without the streaming-body marker mechanism.""" + seen_calls: list[int] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen_calls.append(1) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"") + + async def streamed_body() -> typing.AsyncIterator[bytes]: + yield b"chunk1" + yield b"chunk2" + + client = _client(handler) + async with client.stream("POST", "https://example.test/upload", content=streamed_body()) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + + assert seen_calls == [1] + + +async def test_stream_with_timeout_kwarg() -> None: + def handler(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(HTTPStatus.OK, request=request, content=b"ok") + + client = _client(handler) + async with client.stream("GET", "https://example.test/x", timeout=5.0) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + assert response.status_code == HTTPStatus.OK + + +async def test_stream_with_json_kwarg() -> None: + seen: list[bytes] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request.content) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"ok") + + client = _client(handler) + async with client.stream("POST", "https://example.test/x", json={"key": "value"}) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + assert b"key" in seen[0] + + +async def test_stream_with_data_and_extensions_kwargs() -> None: + seen: list[httpx2.Request] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"ok") + + client = _client(handler) + async with client.stream( + "POST", + "https://example.test/x", + data={"field": "val"}, + extensions={"timeout": {"connect": 5}}, + ) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + assert seen[0].headers["content-type"].startswith("application/x-www-form-urlencoded") + + +async def test_stream_with_files_kwarg() -> None: + seen: list[httpx2.Request] = [] + + def handler(request: httpx2.Request) -> httpx2.Response: + seen.append(request) + return httpx2.Response(HTTPStatus.OK, request=request, content=b"ok") + + client = _client(handler) + async with client.stream( + "POST", + "https://example.test/x", + files={"upload": ("hello.txt", b"hello", "text/plain")}, + ) as response: + _ = [chunk async for chunk in response.aiter_bytes()] + assert "multipart/form-data" in seen[0].headers["content-type"] From 4c18956cdb089703791f2164f46ea233c18e0bfb Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 19:50:21 +0300 Subject: [PATCH 8/9] docs: 0.5.0 release notes + sync user docs with streaming work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - README + docs/index.md: add 'Streaming responses' subsection - planning/engineering.md §1 + §8: mention stream() in project intent; mark Epic 4 SHIPPED in roadmap - planning/deferred-work.md: close the 'Retry + streaming bodies' open item and update the v0.2-pivot StreamError-escape entry; add a new 'Closed by the 0.5.0 streaming release' section - planning/releases/0.5.0.md: new release notes --- README.md | 19 ++++++++++++++ docs/index.md | 19 ++++++++++++++ planning/deferred-work.md | 11 ++++---- planning/engineering.md | 4 +-- planning/releases/0.5.0.md | 54 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 7 deletions(-) create mode 100644 planning/releases/0.5.0.md diff --git a/README.md b/README.md index 5df1d34..3fae5d6 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,25 @@ async def main() -> None: user = await client.get("/users/1", response_model=User) ``` +### Streaming responses + +For large responses or server-sent events, stream the body chunk-by-chunk. `stream()` is an async context manager: + +```python +from httpware import AsyncClient + + +async def main() -> None: + async with AsyncClient(base_url="https://api.example.com") as client: + async with client.stream("GET", "/big-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +`stream()` auto-raises `StatusError` subclasses on 4xx/5xx with the response body pre-read, so `exc.response.content` is accessible from the caught exception. + +It does NOT pass through the middleware chain: `Retry`, `Bulkhead`, and any custom middleware are bypassed. (Retry separately refuses to retry any request — stream or non-stream — whose body was an async-iterable, since streams can't replay across attempts.) + ## Errors All 4xx/5xx responses raise typed exceptions automatically: `NotFoundError`, `ServiceUnavailableError`, `RateLimitedError`, etc. — all subclasses of `httpware.StatusError`. Transport-layer transient failures raise `NetworkError`; the resilience middleware raise `RetryBudgetExhaustedError` and `BulkheadFullError`. Everything inherits `httpware.ClientError`. diff --git a/docs/index.md b/docs/index.md index 70f0e0f..925c381 100644 --- a/docs/index.md +++ b/docs/index.md @@ -59,6 +59,25 @@ async def main() -> None: user = await client.get("/users/1", response_model=User) ``` +### Streaming responses + +For large responses or server-sent events, stream the body chunk-by-chunk. `stream()` is an async context manager: + +```python +from httpware import AsyncClient + + +async def main() -> None: + async with AsyncClient(base_url="https://api.example.com") as client: + async with client.stream("GET", "/big-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +`stream()` auto-raises `StatusError` subclasses on 4xx/5xx with the response body pre-read, so `exc.response.content` is accessible from the caught exception. + +It does NOT pass through the middleware chain: `Retry`, `Bulkhead`, and any custom middleware are bypassed. (Retry separately refuses to retry any request — stream or non-stream — whose body was an async-iterable, since streams can't replay across attempts.) + ## Errors All 4xx/5xx responses raise typed exceptions automatically: `NotFoundError`, `ServiceUnavailableError`, `RateLimitedError`, etc. — all subclasses of `httpware.StatusError`. Transport-layer transient failures raise `NetworkError`; the resilience middleware raise `RetryBudgetExhaustedError` and `BulkheadFullError`. Everything inherits `httpware.ClientError`. diff --git a/planning/deferred-work.md b/planning/deferred-work.md index 240ac79..93c1806 100644 --- a/planning/deferred-work.md +++ b/planning/deferred-work.md @@ -4,10 +4,6 @@ Items raised in reviews that are real but not actionable now. ## Open -### Retry + streaming bodies (Epic 4 interaction) - -- **`Retry` re-invokes `next(request)` with the same `httpx2.Request` on each attempt.** Safe for in-memory bytes/JSON bodies; unsafe for streaming/async-iterable bodies (consumed iterator can't replay). When Epic 4 ships `AsyncClient.stream` (`4-3`), Retry needs to refuse to retry streamed-body requests (or document that callers supply a body factory). Spec: `planning/specs/2026-06-05-retry-and-retry-budget-design.md` §"Open questions". - ### Decoder-side - **`_get_adapter` `lru_cache` is module-global, not per-decoder instance** — keyed by `model` only; two `PydanticDecoder()` instances with different configurations (none today) would share adapters, and the cache survives across tests unless explicitly cleared. Revisit if/when a configurable `PydanticDecoder(mode=..., strict=...)` lands. (`src/httpware/decoders/pydantic.py:12-14`) @@ -19,13 +15,18 @@ PR #21 (`feat/v0.3-pydantic-optional`) shipped 0.3.0 with pydantic moved to `[pr - **`pydantic` import not guarded the way `msgspec` is** — closed. `decoders/pydantic.py` now guards via `import_checker.is_pydantic_installed`; `PydanticDecoder.__init__` raises `ImportError` with the install hint; `AsyncClient(decoder=None)` fail-fast in `_default_pydantic_decoder()`. - **Empty/malformed payload tests** — closed. `tests/test_decoders_pydantic.py::test_malformed_payload_raises_validation_error` is a 7-case parametrized test pinning current pydantic-core behavior for `b""`, `b"null"`, `b"{}"`, malformed JSON, and invalid UTF-8. +## Closed by the 0.5.0 streaming release (2026-06-05) + +- **`Retry` refuses streamed-body requests.** When `_request_with_body` is called with an async-iterable `content`/`data`/`files`, the request gets `extensions["httpware.streaming_body"] = True`. `Retry.__call__` reads the marker and re-raises with a PEP-678 note on retryable failures instead of retrying with a consumed iterator. Closes the prior Open entry. +- **`httpx2.StreamError` family escape closed.** The new shared `_httpx2_exception_mapper` catches `httpx2.NetworkError` (which is the parent of `ReadError` / `WriteError` / `CloseError`), so stream-specific exceptions raised during body consumption now map to `httpware.NetworkError` consistently. + ## Closed by the v0.2 thin-wrapper pivot (2026-06-03) The pivot retired Request/Response/Httpx2Transport/RecordedTransport. The following deferred items are no longer applicable because their host code has been removed or because the responsibility shifted to `httpx2`: - `extensions=dict(request.extensions)` opaque forwarding (host module removed). - Unbounded error body size on `StatusError.body` (the `body` field no longer exists; callers reach into `exc.response.content` themselves). -- `httpx2.StreamError` family escape from the transport's `except httpx2.HTTPError` (mapping logic relocated to AsyncClient's terminal; revisit with Epic 4 streaming work). +- `httpx2.StreamError` family escape from the transport's `except httpx2.HTTPError` (mapping logic relocated to AsyncClient's terminal; closed by 0.5.0 streaming work — exception mapping in _httpx2_exception_mapper covers the StreamError family via httpx2.NetworkError). - Header CRLF / log-injection at the transport seam (host module removed; httpx2 validates). - Userinfo on `StatusError.request_url` raw field (the field no longer exists; `__repr__` and summary still sanitize). - Concurrent `aclose()` ↔ `__call__` races on `Httpx2Transport` (host class removed; lifecycle is `httpx2`'s concern). diff --git a/planning/engineering.md b/planning/engineering.md index 444da81..cb4fe79 100644 --- a/planning/engineering.md +++ b/planning/engineering.md @@ -4,7 +4,7 @@ This doc is the single distilled reference for `httpware` design rationale, prot ## 1. Project intent -`httpware` is a thin opinionated wrapper around `httpx2`. It re-exports `httpx2.Request` and `httpx2.Response` as the public request/response surface and adds three things on top: typed response decoding (via a `ResponseDecoder` protocol; pydantic and msgspec are both opt-in extras as of 0.3.0), a middleware chain composed at client construction, and a status-keyed exception tree raised automatically on 4xx and 5xx. `AsyncClient(decoder=None)` defaults to constructing a `PydanticDecoder` and so requires the `pydantic` extra; callers can supply an explicit `decoder=` argument to escape the default. As of 0.4.0, the package ships a small resilience suite under `httpware.middleware.resilience` — a `Retry` middleware with a Finagle-style `RetryBudget`, plus a `Bulkhead` concurrency limiter — composed via the standard middleware chain. +`httpware` is a thin opinionated wrapper around `httpx2`. It re-exports `httpx2.Request` and `httpx2.Response` as the public request/response surface and adds three things on top: typed response decoding (via a `ResponseDecoder` protocol; pydantic and msgspec are both opt-in extras as of 0.3.0), a middleware chain composed at client construction, and a status-keyed exception tree raised automatically on 4xx and 5xx. `AsyncClient(decoder=None)` defaults to constructing a `PydanticDecoder` and so requires the `pydantic` extra; callers can supply an explicit `decoder=` argument to escape the default. As of 0.4.0, the package ships a small resilience suite under `httpware.middleware.resilience` — a `Retry` middleware with a Finagle-style `RetryBudget`, plus a `Bulkhead` concurrency limiter — composed via the standard middleware chain. As of 0.5.0, `AsyncClient.stream()` provides a context-manager API for chunked response bodies; it bypasses the middleware chain by design (see planning/specs/2026-06-05-streaming-design.md). The 0.1.0 release attempted to own a full abstraction over the underlying HTTP client. v0.2 walks that back: `httpx2` is part of the public surface. @@ -132,7 +132,7 @@ Post-pivot, the roadmap has three categories. Topic slugs in `planning/specs/` a - **Shipped in v0.4 slice 1:** `Retry` middleware + Finagle-style `RetryBudget` token bucket + `attempt_timeout=` parameter (folded-in 3-1). See [`planning/specs/2026-06-05-retry-and-retry-budget-design.md`](specs/2026-06-05-retry-and-retry-budget-design.md) and [`planning/plans/2026-06-05-retry-and-retry-budget-plan.md`](plans/2026-06-05-retry-and-retry-budget-plan.md). - **Shipped in v0.4 slice 2:** `Bulkhead` middleware (concurrency limiter via `asyncio.Semaphore` with bounded acquire wait). See [`planning/specs/2026-06-05-bulkhead-design.md`](specs/2026-06-05-bulkhead-design.md) and [`planning/plans/2026-06-05-bulkhead-plan.md`](plans/2026-06-05-bulkhead-plan.md). - **Remaining:** `3-6` extension-slot docs. -- **Epic 4 — Streaming:** `4-3` `AsyncClient.stream` context manager (forwards to `httpx2.AsyncClient.stream`; no `StreamResponse` type). +- **Epic 4 — Streaming:** SHIPPED in v0.5 (PR #…): `AsyncClient.stream()` context manager + Retry refuses streamed-body requests. See [`planning/specs/2026-06-05-streaming-design.md`](specs/2026-06-05-streaming-design.md) and [`planning/plans/2026-06-05-streaming-plan.md`](plans/2026-06-05-streaming-plan.md). - **Epic 5 — Observability:** `5-1` Layer 1 middleware hooks, `5-2` wire into resilience middlewares, `5-4` OpenTelemetry middleware (will declare the `otel` extra at the same time the code lands), `5-5` logging policy CI grep. - **Epic 6 — Ship v1.0:** `6-2` docs site (`mkdocs`), `6-3` benchmarks, `6-5` release flow (Trusted Publishers + Sigstore). - **Carry-forward decoder:** `1-6` msgspec decoder via extras — second `ResponseDecoder` adapter, already implemented; verified surviving in the pivot. diff --git a/planning/releases/0.5.0.md b/planning/releases/0.5.0.md new file mode 100644 index 0000000..4cedd76 --- /dev/null +++ b/planning/releases/0.5.0.md @@ -0,0 +1,54 @@ +# httpware 0.5.0 — Streaming responses + +**0.5.0 is additive. No breaking changes.** Code written against 0.4.0 continues to work unchanged. + +This release closes Epic 4 by adding `AsyncClient.stream()` for chunked response bodies, and closes two longstanding deferred-work items along the way. + +## New features + +- **`AsyncClient.stream(method, url, **kwargs)`** — async context manager that yields an `httpx2.Response` with a non-pre-read body. Consume via `response.aiter_bytes()`, `response.aiter_text()`, `response.aiter_lines()`, or `response.aiter_raw()`. Auto-raises `StatusError` subclasses on 4xx/5xx (with the body pre-read so `exc.response.content` works). Bypasses the middleware chain by design — `Retry`, `Bulkhead`, and user-installed middleware do not see `stream()` calls in v1. +- **`Retry` refuses streamed-body requests.** When you call `client.post(content=async_gen())` (or `data=`, `files=`), the request is marked via `request.extensions["httpware.streaming_body"]`. If `Retry` would otherwise retry on a failure, it re-raises the original exception with a PEP 678 note instead — preventing the "consumed iterator can't replay" footgun. + +## Backwards compatibility + +Subclassing/extensions preserve every existing catch-block: + +- All previously-shipping methods (`get`, `post`, etc.) behave identically. +- The internal refactor that extracted `_httpx2_exception_mapper` from `_terminal` is byte-for-byte equivalent in dispatch behavior. Tests prove this. +- The streaming-body marker (`request.extensions["httpware.streaming_body"]`) only affects requests that genuinely have async-iterable bodies. Existing code passing bytes / dict / files-as-bytes is unaffected. + +## Usage + +```python +from httpware import AsyncClient + + +async def main() -> None: + async with AsyncClient(base_url="https://api.example.com") as client: + async with client.stream("GET", "/big-file") as response: + async for chunk in response.aiter_bytes(): + process(chunk) +``` + +Catch typed status errors on streams the same way as on regular calls: + +```python +from httpware import NotFoundError + +try: + async with client.stream("GET", "/maybe-missing") as response: + ... +except NotFoundError as exc: + body_text = exc.response.text # pre-read; accessible +``` + +## What's still ahead + +- Epic 5 (observability hooks + OTel middleware) is unstarted; logging of retry / bulkhead / stream decisions plumbs through then. +- Whether `stream()` should compose with the middleware chain is deferred to real-user feedback. Adding it later is purely additive (`stream(..., apply_middleware: bool = False)` opt-in). + +## References + +- Spec: [`planning/specs/2026-06-05-streaming-design.md`](../specs/2026-06-05-streaming-design.md) +- Plan: [`planning/plans/2026-06-05-streaming-plan.md`](../plans/2026-06-05-streaming-plan.md) +- Roadmap: [`planning/engineering.md`](../engineering.md) §8 From 820addc17adbecd75b3b937e36e29aa13159cd8b Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 5 Jun 2026 20:00:41 +0300 Subject: [PATCH 9/9] refactor: hoist STREAMING_BODY_MARKER + _STREAMING_BODY_REFUSAL_NOTE to module constants Final-review feedback: the "httpware.streaming_body" marker key was duplicated at 5 sites (1 write in client.py + 4 reads in retry.py) and the PEP-678 refusal note was duplicated at 4 sites in retry.py. Per project convention (module-level UPPER_CASE constants over inlined string literals; same pattern as _MAX_ATTEMPTS_INVALID, _MAX_CONCURRENT_INVALID, _DEFAULT_DECODER_MISSING_MESSAGE), hoist both: - STREAMING_BODY_MARKER in client.py (public so retry.py can import it; this IS the contract between the two modules) - _STREAMING_BODY_REFUSAL_NOTE in retry.py (private; only used there) retry.py now imports STREAMING_BODY_MARKER from client. No circular import: client doesn't import retry transitively. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/httpware/client.py | 9 ++++++- src/httpware/middleware/resilience/retry.py | 26 ++++++++------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/httpware/client.py b/src/httpware/client.py index 2e7069a..2984ed9 100644 --- a/src/httpware/client.py +++ b/src/httpware/client.py @@ -71,6 +71,13 @@ def _raise_on_status_error(response: httpx2.Response) -> None: raise exc_class(response) +STREAMING_BODY_MARKER = "httpware.streaming_body" +"""Key set on ``httpx2.Request.extensions`` by ``_request_with_body`` when content/data/files is an async-iterable. + +``Retry.__call__`` reads this marker to refuse retrying a streamed-body request +(the consumed iterator cannot replay across attempts).""" + + def _is_streaming_body(value: typing.Any) -> bool: """Return True if value is an async-iterable that cannot be safely replayed for retry.""" if value is None: @@ -210,7 +217,7 @@ async def _request_with_body( # noqa: PLR0913, C901 — mirrors httpx2 per-meth kwargs["files"] = files request = self._httpx2_client.build_request(method, url, **kwargs) if _is_streaming_body(content) or _is_streaming_body(data) or _is_streaming_body(files): - request.extensions["httpware.streaming_body"] = True + request.extensions[STREAMING_BODY_MARKER] = True return await self.send(request, response_model=response_model) @typing.overload diff --git a/src/httpware/middleware/resilience/retry.py b/src/httpware/middleware/resilience/retry.py index abb5f64..d72e2de 100644 --- a/src/httpware/middleware/resilience/retry.py +++ b/src/httpware/middleware/resilience/retry.py @@ -16,6 +16,7 @@ import httpx2 +from httpware.client import STREAMING_BODY_MARKER from httpware.errors import NetworkError, RetryBudgetExhaustedError, StatusError, TimeoutError # noqa: A004 from httpware.middleware import Next from httpware.middleware.resilience._backoff import full_jitter_delay @@ -43,6 +44,7 @@ ) _MAX_ATTEMPTS_INVALID = "max_attempts must be >= 1" +_STREAMING_BODY_REFUSAL_NOTE = "httpware: not retrying — request body is a stream that cannot replay across attempts" def _parse_retry_after(value: str) -> float | None: @@ -108,19 +110,15 @@ async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response except StatusError as exc: retryable_status = exc.response.status_code in self.retry_status_codes if not method_eligible or not retryable_status: - if retryable_status and request.extensions.get("httpware.streaming_body"): - exc.add_note( - "httpware: not retrying — request body is a stream that cannot replay across attempts" - ) + if retryable_status and request.extensions.get(STREAMING_BODY_MARKER): + exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) raise last_exc = exc last_response = exc.response except (NetworkError, TimeoutError) as exc: if not method_eligible: - if request.extensions.get("httpware.streaming_body"): - exc.add_note( - "httpware: not retrying — request body is a stream that cannot replay across attempts" - ) + if request.extensions.get(STREAMING_BODY_MARKER): + exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) raise last_exc = exc last_response = None @@ -128,22 +126,18 @@ async def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response wrapped = TimeoutError("attempt timed out") wrapped.__cause__ = exc # set now; the retry path (last_exc = wrapped) has no `from` clause if not method_eligible: - if request.extensions.get("httpware.streaming_body"): - wrapped.add_note( - "httpware: not retrying — request body is a stream that cannot replay across attempts" - ) + if request.extensions.get(STREAMING_BODY_MARKER): + wrapped.add_note(_STREAMING_BODY_REFUSAL_NOTE) raise wrapped from exc last_exc = wrapped last_response = None # ---- retryable failure path - if request.extensions.get("httpware.streaming_body"): + if request.extensions.get(STREAMING_BODY_MARKER): if last_exc is None: # pragma: no cover — invariant from except branch msg = "Retry: streaming-body refusal reached with no last_exc" raise AssertionError(msg) - last_exc.add_note( - "httpware: not retrying — request body is a stream that cannot replay across attempts" - ) + last_exc.add_note(_STREAMING_BODY_REFUSAL_NOTE) raise last_exc if is_last: