From 96d1b7cec583975fa91961c49a96cf189ca1aff9 Mon Sep 17 00:00:00 2001 From: kpwtxt Date: Fri, 29 May 2026 14:48:01 -0400 Subject: [PATCH 1/3] Add JSON Patch streaming support --- README.md | 61 +++++ docs/cli.md | 14 ++ docs/client.md | 82 ++++++- examples/stream_early_routing.py | 94 +++++++ examples/stream_fanout.py | 105 ++++++++ examples/stream_field_printer.py | 45 ++++ examples/stream_hitl_approval.py | 96 ++++++++ examples/stream_reconstruct.py | 70 ++++++ src/dottxt/__init__.py | 10 +- src/dottxt/cli.py | 119 ++++++++- src/dottxt/client.py | 101 +++++--- src/dottxt/schemas.py | 49 ++++ src/dottxt/streaming.py | 166 +++++++++++++ tests/test_cli.py | 195 +++++++++++++++ tests/test_streaming.py | 403 +++++++++++++++++++++++++++++++ 15 files changed, 1577 insertions(+), 33 deletions(-) create mode 100644 examples/stream_early_routing.py create mode 100644 examples/stream_fanout.py create mode 100644 examples/stream_field_printer.py create mode 100644 examples/stream_hitl_approval.py create mode 100644 examples/stream_reconstruct.py create mode 100644 src/dottxt/streaming.py create mode 100644 tests/test_streaming.py diff --git a/README.md b/README.md index a6cd8ec..e28ff92 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,62 @@ For direct `chat.completions.create(...)`, pass the wrapped OpenAI-style Use `DotTxt.models.list()` and `AsyncDotTxt.models.list()` for model listing. +## Streaming Fields + +`AsyncDotTxt.stream(...)` yields `PatchEvent` objects as the model fills in a +schema-constrained response. The wire format is the gateway's `stream: "patch"` +mode (RFC 6902 JSON Patch over NDJSON). + +Each event carries the raw op (`event.op`) and an independent deep copy of the +document so far (`event.snapshot`). For the common case of reacting to one +field at a time, use the demux properties: `event.is_leaf` skips structural +ops (root seed, empty-container init), `event.field` is the JSON Pointer with +the leading `/` stripped (`"intent"`, `"steps/0"`, `"address/city"`), and +`event.value` is the op's value. + +```python +import asyncio +from typing import Literal + +from pydantic import BaseModel + +from dottxt import AsyncDotTxt + + +class SupportTicket(BaseModel): + # Field order = arrival order. Put what unblocks downstream work first. + intent: Literal["billing", "technical", "account"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str + + +async def main() -> None: + client = AsyncDotTxt() + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input="I was charged twice this month, please refund the duplicate.", + ) + async for event in stream: + if not event.is_leaf: + continue + match event.field: + case "intent": + print(f"dispatching to {event.value} queue") + case "urgency" if event.value == "critical": + print("paging oncall") + case "reply": + print(f"reply: {event.value}") + + +asyncio.run(main()) +``` + +The routing decision can fire tens of milliseconds into generation while +`reply` continues to stream. See +[docs/client.md](docs/client.md#streaming-fields-patch-stream) for the full +reference. + ## OpenAI-Compatible Usage Use `DotTxt` when you want an OpenAI-style client surface with @@ -224,3 +280,8 @@ The compatibility surface expects the wrapped OpenAI-style - [Use a Genson schema builder to generate](examples/generate_genson.py) - [List available models](examples/list_models.py) - [OpenAI-Compatible chat completions](examples/openai_chat_completions.py) +- [Stream fields as they arrive](examples/stream_field_printer.py) +- [Route on /intent before /reply finishes](examples/stream_early_routing.py) +- [Mid-stream human approval](examples/stream_hitl_approval.py) +- [Fan out research on each /steps/N](examples/stream_fanout.py) +- [Reconstruct the document from raw patch ops](examples/stream_reconstruct.py) diff --git a/docs/cli.md b/docs/cli.md index a5a8701..5cf3195 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -74,6 +74,7 @@ Output rules: targeted error with guidance to run `dottxt models` and set `DOTTXT_MODEL` or pass `--model` +<<<<<<< HEAD ### `dottxt schema check` Validate a schema file as JSON Schema. @@ -82,3 +83,16 @@ Validate a schema file as JSON Schema. - ``: JSON file path to validate - `--json`: emits structured payload including `status` and `schema_file` - Errors follow the shared `--json` error envelope when enabled +======= +### `dottxt stream` + +Stream one generation as it is produced using JSON Patch RFC 6902. + +- `-m, --model TEXT`: model id (required unless `DOTTXT_MODEL` is set) +- `-s, --schema FILE`: schema file path (required) +- `[PROMPT]`: literal prompt text (falls back to stdin, same rules as `generate`) + +Output rules: + +- stdout: one RFC 6902 `add` op per line (NDJSON), in arrival order. +>>>>>>> d5cc04a (Add JSON Patch streaming support) diff --git a/docs/client.md b/docs/client.md index d5e2438..23021e3 100644 --- a/docs/client.md +++ b/docs/client.md @@ -30,7 +30,7 @@ Requires Python 3.10+. The client takes two arguments. Each is read from the constructor first, then from the environment. -- `api_key` (`str | None`): falls back to `DOTTXT_API_KEY`. Required — the +- `api_key` (`str | None`): falls back to `DOTTXT_API_KEY`. Required, the constructor raises `ValueError` if neither is set. - `base_url` (`str | None`): falls back to `DOTTXT_BASE_URL`, then to `https://api.dottxt.ai/v1`. @@ -209,11 +209,81 @@ result = client.generate( print(result) # {'severity': 'high', 'team': 'checkout'} ``` +## Streaming Fields (Patch Stream) + +`AsyncDotTxt.stream(...)` yields `PatchEvent` objects as the model fills in a +schema-constrained response. It is built on the gateway's `stream: "patch"` +mode, which emits RFC 6902 JSON Patch operations in schema order, so +downstream work can start the moment a field arrives, without waiting for +the closing brace. + +Parameters mirror `generate(...)`: + +- `model` (`str`) +- `input` (`str | list[dict]`) +- `response_format` (`Any`) — any schema input accepted by `generate(...)` +- `temperature`, `max_tokens`, `seed`, `timeout` — optional +- `extra` (`dict | None`) — extra chat-completions body fields + +Each `PatchEvent` carries: + +- `event.op` — the raw RFC 6902 operation (`{"op": "add", "path": ..., "value": ...}`) +- `event.snapshot` — an independent deep copy of the JSON object built up to + and including this op +- `event.is_leaf` / `event.field` / `event.value` convenience demux for + the common case of reacting to one field at a time. `is_leaf` is `True` + for non-structural adds (skipping the root seed and empty-container init + ops); `field` is the JSON Pointer with the leading `/` stripped + (`"intent"`, `"steps/0"`, `"address/city"`). + +```python +import asyncio +from typing import Literal +from pydantic import BaseModel +from dottxt import AsyncDotTxt + +class SupportTicket(BaseModel): + # Field order = arrival order. Put what unblocks downstream work first. + intent: Literal["billing", "technical", "account"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str + +async def main(): + client = AsyncDotTxt() + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input="I was charged twice this month, please refund the duplicate.", + ) + async for event in stream: + if not event.is_leaf: + continue + match event.field: + case "intent": + asyncio.create_task(dispatch_to_queue(event.value)) + case "urgency" if event.value == "critical": + asyncio.create_task(page_oncall()) + case "reply": + await send(event.value) + +asyncio.run(main()) +``` + +The routing decision fires the moment `intent` arrives, typically tens of +milliseconds in while `reply` continues to stream. If you need the full +object so far (e.g. to log progress or hand a partial object to another +service), use `event.snapshot`. + +Errors: + +- `dottxt.PatchStreamError`: raised when the gateway returns a non-200 + status. Exposes `status_code` and `body`. + ## OpenAI-Compatible Text Generation If you prefer the standard OpenAI SDK surface, you can call `chat.completions.create(...)` directly. The client passes the call through -unchanged and returns the raw chat completion object — parsing and +unchanged and returns the raw chat completion object, parsing and validation are up to the caller. For structured output, pass the wrapped OpenAI-style `response_format` @@ -268,3 +338,11 @@ Runnable examples live in the [`examples/`](../examples) directory: - [`list_models.py`](../examples/list_models.py): list available models - [`openai_chat_completions.py`](../examples/openai_chat_completions.py): use the OpenAI-compatible `chat.completions.create` surface +- [`stream_field_printer.py`](../examples/stream_field_printer.py): minimal + `stream` demo — print each leaf field and value as it lands +- [`stream_early_routing.py`](../examples/stream_early_routing.py): route on + `/intent` while `/reply` is still streaming +- [`stream_hitl_approval.py`](../examples/stream_hitl_approval.py): approve a + proposed action mid-stream and discard the reply if the operator declines +- [`stream_fanout.py`](../examples/stream_fanout.py): fan research tasks out + on each `/steps/N` as the planner emits them diff --git a/examples/stream_early_routing.py b/examples/stream_early_routing.py new file mode 100644 index 0000000..5aebf44 --- /dev/null +++ b/examples/stream_early_routing.py @@ -0,0 +1,94 @@ +"""Route on /intent before /reply finishes. + +The schema is ordered ``intent`` → ``urgency`` → ``reply``. Because dottxt +streams fields in schema order, the routing decision fires the moment +``intent`` arrives, typically tens of milliseconds in, while the model +continues generating the (much longer) ``reply``. + +What to watch in the output: the ``-> dispatched ...`` and +``-> paged oncall ...`` lines arrive well before the final reply line. +The elapsed-time prefix on the reply line is the punchline, how much later +the full message lands compared to when routing was already settled. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_early_routing.py +""" + +import asyncio +import time +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class SupportTicket(BaseModel): + """A triaged support reply. + + Field order is significant: earlier fields arrive first and unblock + downstream work that does not depend on later fields. + """ + + intent: Literal["billing", "technical", "account", "feedback"] + urgency: Literal["low", "medium", "high", "critical"] + reply: str = Field(max_length=400) + + +async def route_to_billing(ticket_id: str) -> None: + """Dispatch the ticket to the billing queue (stub).""" + print(f" -> dispatched {ticket_id} to billing queue") + + +async def route_to_technical(ticket_id: str) -> None: + """Dispatch the ticket to the technical queue (stub).""" + print(f" -> dispatched {ticket_id} to technical queue") + + +async def page_oncall(ticket_id: str) -> None: + """Page the on-call engineer (stub).""" + print(f" -> paged oncall for {ticket_id}") + + +async def main() -> None: + """Run the example.""" + ticket_id = "TKT-8821" + user_message = ( + "I was charged twice for my subscription this month and the second " + "charge doesn't appear in my invoice list. Please refund the duplicate." + ) + + client = AsyncDotTxt() + started = time.monotonic() + try: + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input=[ + { + "role": "system", + "content": "Triage support tickets and draft a reply.", + }, + {"role": "user", "content": user_message}, + ], + max_tokens=400, + ) + async for event in stream: + match event.field: + # Fire-and-forget: routing kicks off while /reply is still + # streaming. + case "intent" if event.value == "billing": + asyncio.create_task(route_to_billing(ticket_id)) + case "intent" if event.value == "technical": + asyncio.create_task(route_to_technical(ticket_id)) + case "urgency" if event.value == "critical": + asyncio.create_task(page_oncall(ticket_id)) + case "reply": + elapsed_ms = int((time.monotonic() - started) * 1000) + print(f"reply ({elapsed_ms}ms): {event.value}") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_fanout.py b/examples/stream_fanout.py new file mode 100644 index 0000000..34316e8 --- /dev/null +++ b/examples/stream_fanout.py @@ -0,0 +1,105 @@ +"""Fan out research tasks as a planner emits each step. + +The planner schema has a top-level ``steps`` array. Each item streams in as +a separate field (``steps/0``, ``steps/1``, ...). We launch a research +coroutine the moment each step arrives, so step 0's research is already +underway while step 1 is still being generated. + +What to watch in the output +--------------------------- + +The interesting comparison is **total wall-clock time vs. the sum of the +per-step research times**. Each step's stub research takes ~1.0-1.8s +(``1.0 + 0.2 * index``); if research ran serially after generation +completed, you'd pay generation_time + Σ research_time. Because each +``research()`` task is scheduled on arrival and the planner keeps +emitting while they run, total time is closer to +``max(generation_time, generation_time_until_last_step + slowest_research)`` + (i.e. roughly one research interval longer than generation, not the sum). + +Look for: + +- ``[t+...ms] received step N`` lines arriving while earlier + ``[step N-1] started`` lines are still mid-flight (overlap is visible). +- The final ``all N steps researched in Xms total`` line: ``X`` should be + noticeably less than ``generation_ms + Σ (1000 + 200·i)`` for emitted + steps. With 4 steps that sum is ~4.6s of research; total tends to land + near generation_time + ~1.6s rather than generation_time + ~4.6s. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_fanout.py +""" + +import asyncio +import time +from typing import Any + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class Plan(BaseModel): + """An ordered research plan.""" + + topic: str = Field(max_length=80) + steps: list[str] = Field(min_length=3, max_length=5) + + +async def research(step_index: int, step: str) -> dict[str, Any]: + """Pretend to research a single step (sleep + return).""" + started = time.monotonic() + print(f" [step {step_index}] started: {step!r}") + # Simulate a real lookup. Different durations make the overlap visible. + await asyncio.sleep(1.0 + 0.2 * step_index) + elapsed_ms = int((time.monotonic() - started) * 1000) + print(f" [step {step_index}] done in {elapsed_ms}ms") + return {"step": step, "elapsed_ms": elapsed_ms} + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + tasks: list[asyncio.Task[dict[str, Any]]] = [] + started = time.monotonic() + + try: + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=Plan, + input=( + "Plan three to five research steps to answer the question: " + "'What are the trade-offs between RAG and fine-tuning for " + "domain-specific assistants?' Each step should be a short " + "imperative sentence." + ), + max_tokens=400, + ) + async for event in stream: + if not event.is_leaf: + continue + elapsed_ms = int((time.monotonic() - started) * 1000) + if event.field.startswith("steps/"): + index = int(event.field.split("/", 1)[1]) + print(f"[t+{elapsed_ms:>5}ms] received step {index}") + tasks.append(asyncio.create_task(research(index, event.value))) + elif event.field == "topic": + print(f"[t+{elapsed_ms:>5}ms] topic: {event.value}") + + results = await asyncio.gather(*tasks) + finally: + await client.close() + + total_ms = int((time.monotonic() - started) * 1000) + sum_research_ms = sum(r["elapsed_ms"] for r in results) + print() + print(f"all {len(results)} steps researched in {total_ms}ms total") + print( + f"sum of per-step research times: {sum_research_ms}ms " + f"(serial would take at least this long; overlap saved " + f"{max(0, sum_research_ms - total_ms)}ms)" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_field_printer.py b/examples/stream_field_printer.py new file mode 100644 index 0000000..24c02df --- /dev/null +++ b/examples/stream_field_printer.py @@ -0,0 +1,45 @@ +"""Print each leaf field and value as it arrives from the model. + +Smallest possible demo of the patch-stream interface: define a schema, +iterate, print. No buffering, no closing brace. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_field_printer.py +""" + +import asyncio +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class Engineer(BaseModel): + """A software engineer profile.""" + + name: str = Field(max_length=32) + role: Literal["backend", "frontend", "ml", "infra"] + years_experience: int = Field(ge=0, le=50) + favorite_languages: list[str] = Field(min_length=1, max_length=4) + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + try: + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=Engineer, + input="Generate a profile for a senior backend engineer.", + ) + async for event in stream: + if not event.is_leaf: + continue + print(f"{event.field:>24} = {event.value!r}") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_hitl_approval.py b/examples/stream_hitl_approval.py new file mode 100644 index 0000000..7608eb0 --- /dev/null +++ b/examples/stream_hitl_approval.py @@ -0,0 +1,96 @@ +"""Mid-stream human approval, no checkpointer, no resume. + +The proposed action arrives as a fact (``action``) before the reply is +generated. We prompt the operator between receiving the fact and acting on +it. If the operator declines, the rest of the stream is consumed but the +``reply`` is never sent. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_hitl_approval.py +""" + +import asyncio +from typing import Literal + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt + + +class AgentDecision(BaseModel): + """An agent's proposed action and customer-facing reply. + + ``action`` precedes ``reply`` so the operator can approve or reject + while the reply text is still streaming. + """ + + action: Literal[ + "answer_only", + "open_ticket", + "issue_refund", + "delete_account", + ] + reply: str = Field(max_length=300) + + +HIGH_RISK_ACTIONS = {"issue_refund", "delete_account"} + + +async def ask_human(question: str) -> bool: + """Prompt the operator on stdin; return True if they approve.""" + # input() is blocking; run it off the event loop so other tasks + # (e.g. background dispatching) keep running while we wait. + answer = await asyncio.to_thread(input, f"{question} [y/N]: ") + return answer.strip().lower() in {"y", "yes"} + + +async def send_reply(reply: str) -> None: + """Send the customer-facing reply (stub).""" + print(f"sent reply: {reply}") + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + user_message = "Please close my account permanently. I am leaving." + + approved = True + proposed_action: str | None = None + + try: + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=AgentDecision, + input=[ + { + "role": "system", + "content": ( + "You are a customer support agent. Decide on an " + "action and draft a reply." + ), + }, + {"role": "user", "content": user_message}, + ], + max_tokens=300, + ) + async for event in stream: + if not event.is_leaf: + continue + match event.field: + case "action": + proposed_action = event.value + print(f"proposed action: {event.value}") + if event.value in HIGH_RISK_ACTIONS: + approved = await ask_human(f"Approve action '{event.value}'?") + if not approved: + print("operator declined — reply will not be sent") + case "reply" if approved: + await send_reply(event.value) + case "reply": + print(f"discarded reply (action '{proposed_action}' declined)") + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/stream_reconstruct.py b/examples/stream_reconstruct.py new file mode 100644 index 0000000..7420502 --- /dev/null +++ b/examples/stream_reconstruct.py @@ -0,0 +1,70 @@ +"""Rebuild the full document from raw patch ops as they arrive. + +The SDK already hands you ``event.snapshot`` — an independent deep copy of +the document so far — so most consumers never touch the raw ops. This +example folds ``event.op`` into a document of its own, re-rendering it as it +grows. Use this as a template when you need to drive your own state from the +ops. + +The gateway only ever emits RFC 6902 ``add`` ops, so we reuse the SDK's own +``apply_add`` helper rather than pulling in a full JSON Patch library. + +Usage: + DOTTXT_API_KEY=sk-... python examples/stream_reconstruct.py +""" + +import asyncio +import json +from typing import Any + +from pydantic import BaseModel, Field + +from dottxt import AsyncDotTxt, apply_add + + +class Address(BaseModel): + """A postal address.""" + + city: str = Field(max_length=32) + country: str = Field(max_length=32) + + +class Person(BaseModel): + """A person with a nested address and a list of skills. + + Nesting and an array mean ops arrive for ``/address/city`` and + ``/skills/0`` as well as for top-level scalars. + """ + + name: str = Field(max_length=32) + age: int = Field(ge=0, le=120) + address: Address + skills: list[str] = Field(min_length=1, max_length=4) + + +async def main() -> None: + """Run the example.""" + client = AsyncDotTxt() + # Start from an empty object; the stream's first op is the root seed. + doc: Any = {} + try: + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=Person, + input="Generate a profile for a backend engineer based in Paris.", + ) + async for event in stream: + doc = apply_add(doc, event.op["path"], event.op["value"]) + label = "(root)" if event.field == "" else event.field + print(f"+ {label}") + print(json.dumps(doc, indent=2)) + print("-" * 40) + finally: + await client.close() + + print("final document:") + print(json.dumps(doc, indent=2)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/dottxt/__init__.py b/src/dottxt/__init__.py index 6ff6981..6965625 100644 --- a/src/dottxt/__init__.py +++ b/src/dottxt/__init__.py @@ -3,10 +3,18 @@ from importlib.metadata import PackageNotFoundError, version from dottxt.client import AsyncDotTxt, DotTxt, InvalidOutputError +from dottxt.streaming import PatchEvent, PatchStreamError, apply_add try: # pragma: no cover __version__ = version("dottxt") except PackageNotFoundError: # pragma: no cover __version__ = "0.0.0" -__all__ = ["DotTxt", "AsyncDotTxt", "InvalidOutputError"] +__all__ = [ + "DotTxt", + "AsyncDotTxt", + "InvalidOutputError", + "PatchEvent", + "PatchStreamError", + "apply_add", +] diff --git a/src/dottxt/cli.py b/src/dottxt/cli.py index fdf3143..0153957 100644 --- a/src/dottxt/cli.py +++ b/src/dottxt/cli.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import os import sys @@ -21,7 +22,8 @@ ) from dottxt import __version__ -from dottxt.client import DotTxt +from dottxt.client import AsyncDotTxt, DotTxt +from dottxt.streaming import PatchStreamError def _credentials_path() -> Path: @@ -617,3 +619,118 @@ def generate( _emit(response, json_mode=json_mode) return _emit(response["result"], json_mode=False) + + +async def _run_stream( + *, + api_key: str, + model: str, + schema_text: str, + prompt: str, + json_mode: bool, +) -> None: + """Stream patch ops to stdout as NDJSON. + + One RFC 6902 op per line on stdout (machine-readable, pipeable). + """ + client = AsyncDotTxt(api_key=api_key) + try: + stream = client.stream( + model=model, + response_format=schema_text, + input=prompt, + ) + async for event in stream: + click.echo(json.dumps(event.op, separators=(",", ":"))) + except PatchStreamError as exc: + if _is_model_unavailable_error(exc): + _fail( + ( + f"Model '{model}' is not available for this API key. " + "Run 'dottxt models' to choose an available model, then set " + "DOTTXT_MODEL or pass --model." + ), + json_mode=json_mode, + ) + detail = exc.body.strip() + message = f"Stream failed with API status {exc.status_code}." + _fail(f"{message} {detail[:200]}".strip(), json_mode=json_mode) + except Exception as exc: + _fail(f"Stream failed: {exc}", json_mode=json_mode) + finally: + await client.close() + + +@main.command(name="stream") +@click.option( + "-m", + "--model", + envvar="DOTTXT_MODEL", + required=True, + show_envvar=True, + help="Model to use.", +) +@click.option( + "-s", + "--schema", + "schema_file", + type=click.Path(dir_okay=False, path_type=Path), + required=True, + help="JSON Schema file.", +) +@click.argument("prompt_arg", required=False, metavar="[PROMPT]") +@click.pass_context +def stream( + ctx: click.Context, + model: str, + schema_file: Path, + prompt_arg: str | None, +) -> None: + """Stream a structured response as NDJSON patch ops. + + Emits one RFC 6902 add op per line on stdout as the model fills in the + schema. PROMPT is literal text and is required unless stdin is piped. + The model resolves from --model, then DOTTXT_MODEL. + """ + json_mode = bool(ctx.obj["json_mode"]) + if not schema_file.exists() or not schema_file.is_file(): + _fail(f"Schema file not found: {schema_file}", json_mode=json_mode) + + schema_text = schema_file.read_text(encoding="utf-8") + try: + json.loads(schema_text) + except json.JSONDecodeError as exc: + _fail(f"Schema file is not valid JSON: {exc.msg}", json_mode=json_mode) + + if prompt_arg is not None: + final_prompt = prompt_arg + else: + final_prompt = _read_stdin_prompt() + if not final_prompt: + _fail("No prompt provided. Use PROMPT or pipe stdin.", json_mode=json_mode) + + resolved_api_key = _resolve_api_key() + if not resolved_api_key: + _fail( + "No API key available. Run 'dottxt login' or set DOTTXT_API_KEY.", + json_mode=json_mode, + ) + + _emit_verbose( + ctx, + "Starting stream.", + data={ + "model": model, + "schema_file": str(schema_file), + "prompt_length": len(final_prompt), + }, + ) + asyncio.run( + _run_stream( + api_key=resolved_api_key, + model=model, + schema_text=schema_text, + prompt=final_prompt, + json_mode=json_mode, + ) + ) diff --git a/src/dottxt/client.py b/src/dottxt/client.py index c4759cc..dc887b1 100644 --- a/src/dottxt/client.py +++ b/src/dottxt/client.py @@ -2,13 +2,16 @@ import json import os +from collections.abc import AsyncIterator from typing import Any from openai import AsyncOpenAI as AsyncOpenAISDK from openai import OpenAI as OpenAISDK from pydantic import BaseModel, ValidationError -from dottxt.schemas import SchemaInput, build_response_format +from dottxt.schemas import SchemaInput, build_chat_payload +from dottxt.streaming import PatchEvent +from dottxt.streaming import stream as _stream DEFAULT_BASE_URL = "https://api.dottxt.ai/v1" @@ -118,20 +121,15 @@ def generate( Returns: A parsed Pydantic model or decoded JSON object. """ - if isinstance(input, str): - input = [{"role": "user", "content": input}] - payload: dict[str, Any] = { - **extra, - "model": model, - "messages": input, - "response_format": build_response_format(response_format), - } - if temperature is not None: - payload["temperature"] = temperature - if max_tokens is not None: - payload["max_tokens"] = max_tokens - if seed is not None: - payload["seed"] = seed + payload = build_chat_payload( + model=model, + response_format=response_format, + input=input, + temperature=temperature, + max_tokens=max_tokens, + seed=seed, + extra=extra, + ) completion = self.chat.completions.create(**payload) completion_text = _completion_text(completion) try: @@ -213,20 +211,15 @@ async def generate( Returns: A parsed Pydantic model or decoded JSON object. """ - if isinstance(input, str): - input = [{"role": "user", "content": input}] - payload: dict[str, Any] = { - **extra, - "model": model, - "messages": input, - "response_format": build_response_format(response_format), - } - if temperature is not None: - payload["temperature"] = temperature - if max_tokens is not None: - payload["max_tokens"] = max_tokens - if seed is not None: - payload["seed"] = seed + payload = build_chat_payload( + model=model, + response_format=response_format, + input=input, + temperature=temperature, + max_tokens=max_tokens, + seed=seed, + extra=extra, + ) completion = await self.chat.completions.create(**payload) completion_text = _completion_text(completion) try: @@ -239,6 +232,56 @@ async def generate( original_error=exc, ) from exc + async def stream( + self, + *, + model: str, + response_format: SchemaInput, + input: str | list[dict[str, Any]], + temperature: float | None = None, + max_tokens: int | None = None, + seed: int | None = None, + timeout: float = 60.0, + extra: dict[str, Any] | None = None, + ) -> AsyncIterator[PatchEvent]: + """Stream ``PatchEvent``\\ s as the model fills in a structured response. + + The dottxt gateway emits one RFC 6902 ``add`` op per JSON token. Each + event carries the raw op (``event.op``) and an independent snapshot + of the document built up to and including that op (``event.snapshot``). + + Args: + model: Model identifier. + response_format: Schema input accepted by ``build_response_format``. + input: A prompt string or a list of chat messages. + temperature: Optional temperature value. + max_tokens: Optional max output tokens. + seed: Optional deterministic seed. + timeout: HTTP timeout in seconds. + extra: Additional chat-completions parameters merged into the body. + + Yields: + ``PatchEvent`` objects in the order the gateway produced them. + + Raises: + PatchStreamError: If the upstream returns a non-200 status. + """ + base_url = str(self._client.base_url) + api_key = self._client.api_key + async for event in _stream( + base_url=base_url, + api_key=api_key, + model=model, + response_format=response_format, + input=input, + temperature=temperature, + max_tokens=max_tokens, + seed=seed, + timeout=timeout, + extra=extra, + ): + yield event + async def close(self) -> None: """Close the underlying SDK client.""" await self._client.close() diff --git a/src/dottxt/schemas.py b/src/dottxt/schemas.py index 6c92bfd..bdb8f4a 100644 --- a/src/dottxt/schemas.py +++ b/src/dottxt/schemas.py @@ -124,3 +124,52 @@ def build_response_format( "schema": normalize_schema(schema), }, } + + +def build_chat_payload( + *, + model: str, + response_format: SchemaInput, + input: str | list[dict[str, Any]], + temperature: float | None = None, + max_tokens: int | None = None, + seed: int | None = None, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Assemble a chat-completions request body. + + Shared by ``generate()`` (sync + async) and the patch-stream consumer so + input normalization, ``response_format`` wrapping, and conditional + generation params live in one place. Callers add transport-specific keys + (e.g. ``stream: "patch"``) to the returned dict. + + Args: + model: Model identifier. + response_format: Schema input accepted by ``build_response_format``. + input: A prompt string (sent as a single user message) or a list of + chat messages. + temperature: Optional temperature value. + max_tokens: Optional max output tokens. + seed: Optional deterministic seed. + extra: Additional chat-completions parameters. Merged first so the + assembled keys (``model``, ``messages``, ``response_format``) + take precedence over caller-provided duplicates. + + Returns: + A mutable dict suitable to pass as the request body. + """ + if isinstance(input, str): + input = [{"role": "user", "content": input}] + payload: dict[str, Any] = { + **(extra or {}), + "model": model, + "messages": input, + "response_format": build_response_format(response_format), + } + if temperature is not None: + payload["temperature"] = temperature + if max_tokens is not None: + payload["max_tokens"] = max_tokens + if seed is not None: + payload["seed"] = seed + return payload diff --git a/src/dottxt/streaming.py b/src/dottxt/streaming.py new file mode 100644 index 0000000..bd9b5e8 --- /dev/null +++ b/src/dottxt/streaming.py @@ -0,0 +1,166 @@ +"""Patch-stream consumer for the dottxt ``stream: "patch"`` endpoint. + +The gateway emits one RFC 6902 ``add`` op per JSON token as the model +generates a structured response. This module yields ``PatchEvent`` objects +that carry the raw op together with a snapshot of the document built up to +and including that op. +""" + +from __future__ import annotations + +import copy +import json +from collections.abc import AsyncIterator +from dataclasses import dataclass +from typing import Any + +import httpx + +from dottxt.schemas import SchemaInput, build_chat_payload + + +class PatchStreamError(RuntimeError): + """Raised when the upstream patch stream returns a non-200 status.""" + + def __init__(self, *, status_code: int, body: str) -> None: + super().__init__(f"patch stream failed: {status_code} {body[:500]}") + self.status_code = status_code + self.body = body + + +@dataclass(frozen=True) +class PatchEvent: + """One wire op plus the document reconstructed up to and including it. + + ``op`` is the raw RFC 6902 operation as received, currently always an + ``add``. ``snapshot`` is the document after the op has been applied; it + is an independent deep copy, so callers may stash events without later + ops mutating earlier snapshots. + + The ``is_leaf`` / ``field`` / ``value`` properties demux the op for the + common pattern of reacting to one structured-output field at a time. + """ + + op: dict[str, Any] + snapshot: dict[str, Any] | list[Any] + + @property + def is_leaf(self) -> bool: + """True iff this op contributes a single leaf value. + + False for the root seed (``path == ""``), for empty-container init + ops (``value`` is ``{}`` or ``[]``), and for any op that is not an + ``add``. + """ + return ( + self.op.get("op") == "add" + and self.op.get("path", "") != "" + and self.op.get("value") not in ({}, []) + ) + + @property + def field(self) -> str: + """JSON Pointer for this op with the leading ``/`` stripped. + + Top-level keys read as ``"intent"``, array items as ``"steps/0"``, + nested fields as ``"address/city"``. Returns ``""`` for the root op. + """ + path = self.op.get("path", "") + return path[1:] if path.startswith("/") else path + + @property + def value(self) -> Any: + """The op's ``value`` (a leaf for leaf ops, a container for seeds).""" + return self.op.get("value") + + +def apply_add(doc: Any, path: str, value: Any) -> Any: + """Apply an RFC 6902 ``add`` op in place and return the (possibly new) root. + + Supports the ``path == ""`` root replacement and ``-`` for array append; + numeric path segments index into arrays, everything else keys into + objects. Mutates ``doc`` for non-root paths. + """ + if path == "": + return value + parts = path[1:].split("/") + cur = doc + for p in parts[:-1]: + cur = cur[int(p)] if isinstance(cur, list) else cur[p] + last = parts[-1] + if isinstance(cur, list): + idx = len(cur) if last == "-" else int(last) + cur.insert(idx, value) + else: + cur[last] = value + return doc + + +async def stream( + *, + base_url: str, + api_key: str, + model: str, + response_format: SchemaInput, + input: str | list[dict[str, Any]], + temperature: float | None = None, + max_tokens: int | None = None, + seed: int | None = None, + timeout: float = 60.0, + extra: dict[str, Any] | None = None, +) -> AsyncIterator[PatchEvent]: + """Yield ``PatchEvent``\\ s from a patch-streamed chat completion. + + Sends ``stream: "patch"`` to ``{base_url}/chat/completions`` and reads + the NDJSON response, yielding one event per op as it arrives. Each + event carries the raw op plus an independent snapshot of the document + after the op has been applied. + """ + body = build_chat_payload( + model=model, + response_format=response_format, + input=input, + temperature=temperature, + max_tokens=max_tokens, + seed=seed, + extra=extra, + ) + body["stream"] = "patch" + + url = f"{base_url.rstrip('/')}/chat/completions" + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("POST", url, json=body, headers=headers) as resp: + if resp.status_code != 200: + detail = (await resp.aread()).decode("utf-8", errors="replace") + raise PatchStreamError(status_code=resp.status_code, body=detail) + doc: Any = None + buf = "" + async for chunk in resp.aiter_text(): + buf += chunk + while "\n" in buf: + line, buf = buf.split("\n", 1) + line = line.strip() + if not line: + continue + doc, event = _handle_line(line, doc) + yield event + if buf.strip(): + doc, event = _handle_line(buf, doc) + yield event + + +def _handle_line(line: str, doc: Any) -> tuple[Any, PatchEvent]: + """Parse one NDJSON line, apply the op, and build the PatchEvent. + + Returns the updated root and an event whose ``op`` and ``snapshot`` are + both independent of the live doc, so subsequent ops mutating the doc + cannot retroactively change a yielded event. + """ + op = json.loads(line) + if op.get("op") == "add": + doc = apply_add(doc, op["path"], op["value"]) + return doc, PatchEvent(op=copy.deepcopy(op), snapshot=copy.deepcopy(doc)) diff --git a/tests/test_cli.py b/tests/test_cli.py index c22cb67..31d4ea9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,6 +12,7 @@ from dottxt import cli as cli_module from dottxt.cli import main +from dottxt.streaming import PatchEvent, PatchStreamError VALID_SCHEMA = '{"type":"object"}' STUB_RESULT = {"status": "stub", "message": "SDK call not wired yet."} @@ -80,6 +81,57 @@ def close(self) -> None: self.__class__.close_calls += 1 +STREAM_EVENTS = [ + PatchEvent(op={"op": "add", "path": "", "value": {}}, snapshot={}), + PatchEvent( + op={"op": "add", "path": "/name", "value": "Ada"}, + snapshot={"name": "Ada"}, + ), + PatchEvent( + op={"op": "add", "path": "/age", "value": 35}, + snapshot={"name": "Ada", "age": 35}, + ), +] + + +class FakeAsyncDotTxt: + """Stub AsyncDotTxt client used by CLI streaming tests.""" + + init_api_keys: list[str] = [] + stream_calls: list[dict[str, object]] = [] + close_calls: int = 0 + events: list[PatchEvent] = list(STREAM_EVENTS) + raise_error: Exception | None = None + + def __init__(self, *, api_key: str) -> None: + """Initialize fake async client with provided key.""" + self.__class__.init_api_keys.append(api_key) + + def stream( + self, + *, + model: str, + response_format: str, + input: str, + ) -> object: + """Record stream arguments and return an async iterator of events.""" + self.__class__.stream_calls.append( + {"model": model, "response_format": response_format, "input": input} + ) + return self._iter_events() + + async def _iter_events(self) -> object: + """Yield configured events, or raise the configured error.""" + if self.__class__.raise_error is not None: + raise self.__class__.raise_error + for event in self.__class__.events: + yield event + + async def close(self) -> None: + """Record close calls.""" + self.__class__.close_calls += 1 + + @pytest.fixture def runner() -> CliRunner: """Return a CLI test runner.""" @@ -93,8 +145,14 @@ def patch_dotxt_client(monkeypatch: pytest.MonkeyPatch) -> None: FakeDotTxt.generate_calls.clear() FakeDotTxt.models_calls = 0 FakeDotTxt.close_calls = 0 + FakeAsyncDotTxt.init_api_keys.clear() + FakeAsyncDotTxt.stream_calls.clear() + FakeAsyncDotTxt.close_calls = 0 + FakeAsyncDotTxt.events = list(STREAM_EVENTS) + FakeAsyncDotTxt.raise_error = None monkeypatch.delenv("DOTTXT_API_KEY", raising=False) monkeypatch.setattr(cli_module, "DotTxt", FakeDotTxt) + monkeypatch.setattr(cli_module, "AsyncDotTxt", FakeAsyncDotTxt) monkeypatch.setattr(cli_module, "_resolve_api_key", lambda: "test-key") @@ -930,3 +988,140 @@ def test_emit_verbose_non_dict_data_is_compact_json( captured = capsys.readouterr() assert captured.err.strip() == "[verbose] payload [1,2,3]" + + +def test_stream_emits_one_ndjson_op_per_line( + runner: CliRunner, + schema_file: Path, +) -> None: + """Stream should write one raw RFC 6902 op per line on stdout.""" + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(schema_file), "profile"], + ) + + assert result.exit_code == 0 + lines = result.stdout.strip().splitlines() + ops = [json.loads(line) for line in lines] + assert ops == [event.op for event in STREAM_EVENTS] + assert FakeAsyncDotTxt.stream_calls[-1]["model"] == "openai/gpt-oss-20b" + assert FakeAsyncDotTxt.stream_calls[-1]["input"] == "profile" + assert FakeAsyncDotTxt.close_calls == 1 + + +def test_stream_requires_a_prompt( + runner: CliRunner, + schema_file: Path, +) -> None: + """Stream should fail when neither positional prompt nor stdin is provided.""" + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(schema_file)], + ) + + assert result.exit_code != 0 + assert FakeAsyncDotTxt.stream_calls == [] + + +def test_stream_surfaces_patch_stream_error( + runner: CliRunner, + schema_file: Path, +) -> None: + """An upstream non-200 should surface as a failure mentioning the status.""" + FakeAsyncDotTxt.raise_error = PatchStreamError( + status_code=503, body="upstream unavailable" + ) + + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(schema_file), "profile"], + ) + + assert result.exit_code != 0 + assert "503" in result.output + result.stderr + # The client is still closed on the error path. + assert FakeAsyncDotTxt.close_calls == 1 + + +def test_stream_model_unavailable_gives_targeted_guidance( + runner: CliRunner, + schema_file: Path, +) -> None: + """A model-unavailable upstream error should reuse the generate guidance.""" + FakeAsyncDotTxt.raise_error = PatchStreamError( + status_code=404, body="model not found" + ) + + result = _invoke( + runner, + ["stream", "-m", "missing/model", "-s", str(schema_file), "profile"], + ) + + assert result.exit_code != 0 + assert "not available for this API key" in result.output + result.stderr + + +def test_stream_unexpected_error_surfaces( + runner: CliRunner, + schema_file: Path, +) -> None: + """A non-API error should surface as a generic stream failure.""" + FakeAsyncDotTxt.raise_error = RuntimeError("boom") + + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(schema_file), "profile"], + ) + + assert result.exit_code != 0 + assert "Stream failed: boom" in result.output + result.stderr + assert FakeAsyncDotTxt.close_calls == 1 + + +def test_stream_missing_schema_file_fails( + runner: CliRunner, + tmp_path: Path, +) -> None: + """Stream should fail when the schema file does not exist.""" + missing = tmp_path / "nope.json" + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(missing), "profile"], + ) + + assert result.exit_code != 0 + assert "Schema file not found" in result.output + result.stderr + assert FakeAsyncDotTxt.stream_calls == [] + + +def test_stream_invalid_schema_json_fails( + runner: CliRunner, + tmp_path: Path, +) -> None: + """Stream should fail when the schema file is not valid JSON.""" + bad = _create_schema(tmp_path, content="{not json", name="bad.json") + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(bad), "profile"], + ) + + assert result.exit_code != 0 + assert "not valid JSON" in result.output + result.stderr + assert FakeAsyncDotTxt.stream_calls == [] + + +def test_stream_without_api_key_fails( + runner: CliRunner, + schema_file: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Stream should fail when no API key can be resolved.""" + monkeypatch.setattr(cli_module, "_resolve_api_key", lambda: None) + result = _invoke( + runner, + ["stream", "-m", "openai/gpt-oss-20b", "-s", str(schema_file), "profile"], + ) + + assert result.exit_code != 0 + assert "No API key available" in result.output + result.stderr + assert FakeAsyncDotTxt.stream_calls == [] diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..43c06cc --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,403 @@ +"""Tests for the patch-stream consumer.""" + +from __future__ import annotations + +import json +from typing import Any + +import httpx +import pytest + +from dottxt.streaming import PatchEvent, PatchStreamError, apply_add, stream + + +def test_patch_event_is_leaf_for_top_level_leaf() -> None: + """A non-empty add at a non-root path is a leaf.""" + event = PatchEvent( + op={"op": "add", "path": "/intent", "value": "billing"}, + snapshot={"intent": "billing"}, + ) + assert event.is_leaf is True + assert event.field == "intent" + assert event.value == "billing" + + +def test_patch_event_is_leaf_false_for_root_seed() -> None: + """The root seed op is structural, not a leaf.""" + event = PatchEvent(op={"op": "add", "path": "", "value": {}}, snapshot={}) + assert event.is_leaf is False + assert event.field == "" + + +def test_patch_event_is_leaf_false_for_empty_container() -> None: + """Empty-object and empty-array seed ops are structural.""" + obj_seed = PatchEvent( + op={"op": "add", "path": "/address", "value": {}}, snapshot={"address": {}} + ) + arr_seed = PatchEvent( + op={"op": "add", "path": "/steps", "value": []}, snapshot={"steps": []} + ) + assert obj_seed.is_leaf is False + assert arr_seed.is_leaf is False + + +def test_patch_event_field_for_array_index_and_nested_path() -> None: + """Array indices and nested object paths keep their joined segments.""" + arr = PatchEvent( + op={"op": "add", "path": "/steps/0", "value": "verify"}, + snapshot={"steps": ["verify"]}, + ) + nested = PatchEvent( + op={"op": "add", "path": "/address/city", "value": "Paris"}, + snapshot={"address": {"city": "Paris"}}, + ) + assert arr.field == "steps/0" + assert nested.field == "address/city" + + +def test_patch_event_is_leaf_handles_falsy_primitives() -> None: + """Falsy primitives (0, "", False) are still leaves, only {} / [] are structural.""" + for value in (0, "", False, None): + event = PatchEvent( + op={"op": "add", "path": "/x", "value": value}, snapshot={"x": value} + ) + assert event.is_leaf is True, value + + +def test_apply_add_replaces_root_for_empty_path() -> None: + """An op with ``path == ""`` replaces the document root.""" + assert apply_add(None, "", {}) == {} + assert apply_add({"old": 1}, "", []) == [] + + +def test_apply_add_sets_top_level_key() -> None: + """Top-level keys are set on the root object in place.""" + doc: dict[str, Any] = {} + apply_add(doc, "/intent", "billing") + assert doc == {"intent": "billing"} + + +def test_apply_add_inserts_at_array_index() -> None: + """Numeric path segments insert into arrays at that index.""" + doc: dict[str, list[str]] = {"steps": []} + apply_add(doc, "/steps/0", "verify") + apply_add(doc, "/steps/1", "refund") + assert doc == {"steps": ["verify", "refund"]} + + +def test_apply_add_appends_with_dash() -> None: + """The ``-`` path segment appends to an array.""" + doc: dict[str, list[int]] = {"xs": [1, 2]} + apply_add(doc, "/xs/-", 3) + assert doc == {"xs": [1, 2, 3]} + + +def test_apply_add_nested_object() -> None: + """Nested object keys keep their segments joined by ``/``.""" + doc: dict[str, dict[str, str]] = {"address": {}} + apply_add(doc, "/address/city", "Paris") + assert doc == {"address": {"city": "Paris"}} + + +def _ndjson_response(ops: list[dict[str, Any]]) -> httpx.Response: + """Build a fake NDJSON streaming response.""" + body = "".join(json.dumps(op) + "\n" for op in ops).encode() + return httpx.Response( + 200, + content=body, + headers={"content-type": "application/x-ndjson"}, + ) + + +def _install_mock_transport(monkeypatch: Any, handler: Any) -> None: + """Route httpx.AsyncClient through a MockTransport for the duration of a test.""" + real_async_client = httpx.AsyncClient + + def fake_async_client(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = httpx.MockTransport(handler) + return real_async_client(*args, **kwargs) + + monkeypatch.setattr("dottxt.streaming.httpx.AsyncClient", fake_async_client) + + +@pytest.mark.asyncio +async def test_stream_yields_expected_events(monkeypatch: Any) -> None: + """End-to-end: a canned NDJSON stream produces ops + growing snapshots.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["body"] = json.loads(request.content) + captured["auth"] = request.headers.get("authorization") + return _ndjson_response( + [ + {"op": "add", "path": "", "value": {}}, + {"op": "add", "path": "/intent", "value": "billing"}, + {"op": "add", "path": "/urgency", "value": "high"}, + {"op": "add", "path": "/steps", "value": []}, + {"op": "add", "path": "/steps/0", "value": "verify"}, + {"op": "add", "path": "/steps/1", "value": "refund"}, + {"op": "add", "path": "/reply", "value": "Done."}, + ] + ) + + _install_mock_transport(monkeypatch, handler) + + schema = { + "type": "object", + "properties": { + "intent": {"type": "string"}, + "urgency": {"type": "string"}, + "steps": {"type": "array", "items": {"type": "string"}}, + "reply": {"type": "string"}, + }, + } + events = [ + event + async for event in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="openai/gpt-oss-20b", + response_format=schema, + input="go", + ) + ] + + # One event per wire op — including structural ops (root seed, empty + # containers). + assert [e.op["path"] for e in events] == [ + "", + "/intent", + "/urgency", + "/steps", + "/steps/0", + "/steps/1", + "/reply", + ] + # Snapshot grows monotonically and reflects each op's effect. + assert events[0].snapshot == {} + assert events[1].snapshot == {"intent": "billing"} + assert events[3].snapshot == { + "intent": "billing", + "urgency": "high", + "steps": [], + } + assert events[5].snapshot == { + "intent": "billing", + "urgency": "high", + "steps": ["verify", "refund"], + } + assert events[-1].snapshot == { + "intent": "billing", + "urgency": "high", + "steps": ["verify", "refund"], + "reply": "Done.", + } + assert captured["url"] == "https://api.example.com/v1/chat/completions" + assert captured["body"]["stream"] == "patch" + assert captured["body"]["model"] == "openai/gpt-oss-20b" + assert captured["auth"] == "Bearer sk-test" + + +@pytest.mark.asyncio +async def test_stream_snapshots_are_independent(monkeypatch: Any) -> None: + """Each event's snapshot is a deep copy — later ops don't mutate earlier ones.""" + + def handler(request: httpx.Request) -> httpx.Response: + return _ndjson_response( + [ + {"op": "add", "path": "", "value": {}}, + {"op": "add", "path": "/steps", "value": []}, + {"op": "add", "path": "/steps/0", "value": "a"}, + ] + ) + + _install_mock_transport(monkeypatch, handler) + + events = [ + e + async for e in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + # The /steps event captured the empty list before /steps/0 was added. + assert events[1].snapshot == {"steps": []} + # And the op carried in event 0 (root seed) still shows the seed value, + # not the final document state. + assert events[0].op == {"op": "add", "path": "", "value": {}} + + +@pytest.mark.asyncio +async def test_stream_raises_on_non_200(monkeypatch: Any) -> None: + """A non-200 response surfaces as PatchStreamError with the body.""" + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 403, + content=b'{"error":"Forbidden","message":"no access"}', + headers={"content-type": "application/json"}, + ) + + _install_mock_transport(monkeypatch, handler) + + with pytest.raises(PatchStreamError) as info: + async for _ in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ): + pass + assert info.value.status_code == 403 + assert "Forbidden" in info.value.body + + +@pytest.mark.asyncio +async def test_stream_passes_list_input_unchanged(monkeypatch: Any) -> None: + """When ``input`` is already a list of messages, it is forwarded as-is.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.content) + return _ndjson_response( + [ + {"op": "add", "path": "", "value": {}}, + {"op": "add", "path": "/x", "value": 1}, + ] + ) + + _install_mock_transport(monkeypatch, handler) + + messages = [ + {"role": "system", "content": "be brief"}, + {"role": "user", "content": "go"}, + ] + events = [ + e + async for e in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input=messages, + ) + ] + assert events[-1].snapshot == {"x": 1} + assert captured["body"]["messages"] == messages + + +@pytest.mark.asyncio +async def test_stream_passes_generation_params(monkeypatch: Any) -> None: + """temperature / max_tokens / seed land in the request body.""" + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.content) + return _ndjson_response([{"op": "add", "path": "", "value": {}}]) + + _install_mock_transport(monkeypatch, handler) + + _ = [ + e + async for e in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + temperature=0.2, + max_tokens=128, + seed=7, + extra={"top_p": 0.9}, + ) + ] + body = captured["body"] + assert body["temperature"] == 0.2 + assert body["max_tokens"] == 128 + assert body["seed"] == 7 + assert body["top_p"] == 0.9 + + +@pytest.mark.asyncio +async def test_stream_tolerates_blank_lines_and_trailing_op( + monkeypatch: Any, +) -> None: + """Blank lines are skipped; a trailing op without a newline is flushed.""" + + def handler(request: httpx.Request) -> httpx.Response: + body = ( + b'{"op":"add","path":"","value":{}}\n' + b'{"op":"add","path":"/a","value":1}\n' + b"\n" + b"\n" + b'{"op":"add","path":"/b","value":2}\n' + b'{"op":"add","path":"/c","value":3}' + ) + return httpx.Response( + 200, + content=body, + headers={"content-type": "application/x-ndjson"}, + ) + + _install_mock_transport(monkeypatch, handler) + + events = [ + e + async for e in stream( + base_url="https://api.example.com/v1", + api_key="sk-test", + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + assert [e.op["path"] for e in events] == ["", "/a", "/b", "/c"] + assert events[-1].snapshot == {"a": 1, "b": 2, "c": 3} + + +@pytest.mark.asyncio +async def test_async_dottxt_stream_yields_patch_events( + monkeypatch: Any, +) -> None: + """AsyncDotTxt.stream forwards base_url + api_key and yields PatchEvents.""" + from dottxt import AsyncDotTxt + + captured: dict[str, Any] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["auth"] = request.headers.get("authorization") + return _ndjson_response( + [ + {"op": "add", "path": "", "value": {}}, + {"op": "add", "path": "/intent", "value": "billing"}, + ] + ) + + _install_mock_transport(monkeypatch, handler) + + client = AsyncDotTxt( + api_key="sk-async-test", + base_url="https://api.example.com/v1", + ) + try: + events = [ + e + async for e in client.stream( + model="m", + response_format={"type": "object"}, + input="go", + ) + ] + finally: + await client.close() + + assert all(isinstance(e, PatchEvent) for e in events) + assert events[-1].snapshot == {"intent": "billing"} + assert captured["url"] == "https://api.example.com/v1/chat/completions" + assert captured["auth"] == "Bearer sk-async-test" From 1df904d3713a69dff6144224e7de3d2b84a18866 Mon Sep 17 00:00:00 2001 From: kpwtxt Date: Fri, 29 May 2026 15:07:31 -0400 Subject: [PATCH 2/3] Add apply_add() method to docs --- docs/client.md | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/docs/client.md b/docs/client.md index 23021e3..1312160 100644 --- a/docs/client.md +++ b/docs/client.md @@ -227,7 +227,9 @@ Parameters mirror `generate(...)`: Each `PatchEvent` carries: -- `event.op` — the raw RFC 6902 operation (`{"op": "add", "path": ..., "value": ...}`) +- `event.op` — the raw RFC 6902 operation (`{"op": "add", "path": ..., "value": ...}`). + The dottxt API only ever emits `add` ops; `dottxt.apply_add(doc, path, value)` + folds one into a object in place, returning the (possibly new) root. - `event.snapshot` — an independent deep copy of the JSON object built up to and including this op - `event.is_leaf` / `event.field` / `event.value` convenience demux for @@ -274,6 +276,26 @@ milliseconds in while `reply` continues to stream. If you need the full object so far (e.g. to log progress or hand a partial object to another service), use `event.snapshot`. +To drive your own state from the raw ops instead of the snapshot (e.g. to +mirror the object into a store of your own) fold each `event.op` in with +`apply_add`: + +```python +from dottxt import AsyncDotTxt, apply_add + +async def main(): + client = AsyncDotTxt() + doc = {} # the stream's first op is the root seed + stream = client.stream( + model="openai/gpt-oss-20b", + response_format=SupportTicket, + input="I was charged twice this month, please refund the duplicate.", + ) + async for event in stream: + doc = apply_add(doc, event.op["path"], event.op["value"]) + print(doc) +``` + Errors: - `dottxt.PatchStreamError`: raised when the gateway returns a non-200 From 9cbe664b645c41d47011cc088c64ca85a8a4b37d Mon Sep 17 00:00:00 2001 From: kpwtxt Date: Tue, 2 Jun 2026 10:25:21 -0400 Subject: [PATCH 3/3] Drop is_leaf property --- README.md | 5 +--- docs/client.md | 10 +++----- examples/stream_fanout.py | 2 +- examples/stream_field_printer.py | 2 +- examples/stream_hitl_approval.py | 2 -- src/dottxt/streaming.py | 16 +------------ tests/test_streaming.py | 39 -------------------------------- 7 files changed, 7 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index e28ff92..eaf0292 100644 --- a/README.md +++ b/README.md @@ -171,8 +171,7 @@ mode (RFC 6902 JSON Patch over NDJSON). Each event carries the raw op (`event.op`) and an independent deep copy of the document so far (`event.snapshot`). For the common case of reacting to one -field at a time, use the demux properties: `event.is_leaf` skips structural -ops (root seed, empty-container init), `event.field` is the JSON Pointer with +field at a time, use the demux properties: `event.field` is the JSON Pointer with the leading `/` stripped (`"intent"`, `"steps/0"`, `"address/city"`), and `event.value` is the op's value. @@ -200,8 +199,6 @@ async def main() -> None: input="I was charged twice this month, please refund the duplicate.", ) async for event in stream: - if not event.is_leaf: - continue match event.field: case "intent": print(f"dispatching to {event.value} queue") diff --git a/docs/client.md b/docs/client.md index 1312160..b69aa1a 100644 --- a/docs/client.md +++ b/docs/client.md @@ -232,11 +232,9 @@ Each `PatchEvent` carries: folds one into a object in place, returning the (possibly new) root. - `event.snapshot` — an independent deep copy of the JSON object built up to and including this op -- `event.is_leaf` / `event.field` / `event.value` convenience demux for - the common case of reacting to one field at a time. `is_leaf` is `True` - for non-structural adds (skipping the root seed and empty-container init - ops); `field` is the JSON Pointer with the leading `/` stripped - (`"intent"`, `"steps/0"`, `"address/city"`). +- `event.field` / `event.value` — `field` is the JSON Pointer with the leading `/` stripped + (`"intent"`, `"steps/0"`, `"address/city"`). `value` contains the current field content, + including empty lists `[]` or dictionary `{}` values. ```python import asyncio @@ -258,8 +256,6 @@ async def main(): input="I was charged twice this month, please refund the duplicate.", ) async for event in stream: - if not event.is_leaf: - continue match event.field: case "intent": asyncio.create_task(dispatch_to_queue(event.value)) diff --git a/examples/stream_fanout.py b/examples/stream_fanout.py index 34316e8..b15e4b8 100644 --- a/examples/stream_fanout.py +++ b/examples/stream_fanout.py @@ -76,7 +76,7 @@ async def main() -> None: max_tokens=400, ) async for event in stream: - if not event.is_leaf: + if not event.value: continue elapsed_ms = int((time.monotonic() - started) * 1000) if event.field.startswith("steps/"): diff --git a/examples/stream_field_printer.py b/examples/stream_field_printer.py index 24c02df..5bb477a 100644 --- a/examples/stream_field_printer.py +++ b/examples/stream_field_printer.py @@ -34,7 +34,7 @@ async def main() -> None: input="Generate a profile for a senior backend engineer.", ) async for event in stream: - if not event.is_leaf: + if not event.value: continue print(f"{event.field:>24} = {event.value!r}") finally: diff --git a/examples/stream_hitl_approval.py b/examples/stream_hitl_approval.py index 7608eb0..c58e1d7 100644 --- a/examples/stream_hitl_approval.py +++ b/examples/stream_hitl_approval.py @@ -74,8 +74,6 @@ async def main() -> None: max_tokens=300, ) async for event in stream: - if not event.is_leaf: - continue match event.field: case "action": proposed_action = event.value diff --git a/src/dottxt/streaming.py b/src/dottxt/streaming.py index bd9b5e8..0bdb6f8 100644 --- a/src/dottxt/streaming.py +++ b/src/dottxt/streaming.py @@ -37,27 +37,13 @@ class PatchEvent: is an independent deep copy, so callers may stash events without later ops mutating earlier snapshots. - The ``is_leaf`` / ``field`` / ``value`` properties demux the op for the + The ``field`` / ``value`` properties demux the op for the common pattern of reacting to one structured-output field at a time. """ op: dict[str, Any] snapshot: dict[str, Any] | list[Any] - @property - def is_leaf(self) -> bool: - """True iff this op contributes a single leaf value. - - False for the root seed (``path == ""``), for empty-container init - ops (``value`` is ``{}`` or ``[]``), and for any op that is not an - ``add``. - """ - return ( - self.op.get("op") == "add" - and self.op.get("path", "") != "" - and self.op.get("value") not in ({}, []) - ) - @property def field(self) -> str: """JSON Pointer for this op with the leading ``/`` stripped. diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 43c06cc..199d173 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -11,36 +11,6 @@ from dottxt.streaming import PatchEvent, PatchStreamError, apply_add, stream -def test_patch_event_is_leaf_for_top_level_leaf() -> None: - """A non-empty add at a non-root path is a leaf.""" - event = PatchEvent( - op={"op": "add", "path": "/intent", "value": "billing"}, - snapshot={"intent": "billing"}, - ) - assert event.is_leaf is True - assert event.field == "intent" - assert event.value == "billing" - - -def test_patch_event_is_leaf_false_for_root_seed() -> None: - """The root seed op is structural, not a leaf.""" - event = PatchEvent(op={"op": "add", "path": "", "value": {}}, snapshot={}) - assert event.is_leaf is False - assert event.field == "" - - -def test_patch_event_is_leaf_false_for_empty_container() -> None: - """Empty-object and empty-array seed ops are structural.""" - obj_seed = PatchEvent( - op={"op": "add", "path": "/address", "value": {}}, snapshot={"address": {}} - ) - arr_seed = PatchEvent( - op={"op": "add", "path": "/steps", "value": []}, snapshot={"steps": []} - ) - assert obj_seed.is_leaf is False - assert arr_seed.is_leaf is False - - def test_patch_event_field_for_array_index_and_nested_path() -> None: """Array indices and nested object paths keep their joined segments.""" arr = PatchEvent( @@ -55,15 +25,6 @@ def test_patch_event_field_for_array_index_and_nested_path() -> None: assert nested.field == "address/city" -def test_patch_event_is_leaf_handles_falsy_primitives() -> None: - """Falsy primitives (0, "", False) are still leaves, only {} / [] are structural.""" - for value in (0, "", False, None): - event = PatchEvent( - op={"op": "add", "path": "/x", "value": value}, snapshot={"x": value} - ) - assert event.is_leaf is True, value - - def test_apply_add_replaces_root_for_empty_path() -> None: """An op with ``path == ""`` replaces the document root.""" assert apply_add(None, "", {}) == {}