Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.env
.claude
.kiro
.sessions
.converter

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ async def main():
asyncio.run(main())
```

Event types: `AGENT_START`, `TOKEN`, `REASONING`, `TOOL_START`, `TOOL_END`, `INTERRUPT`, `NODE_START`, `NODE_STOP`, `HANDOFF`, `COMPLETE`, `MULTIAGENT_START`, `MULTIAGENT_COMPLETE`, `ERROR` — each carrying `{type, agent_name, timestamp, data}`. Enough for a real-time frontend, a log aggregator, or a debugging dashboard. The `AnsiRenderer` gives you coloured terminal output out of the box — agent names, tool calls, reasoning traces, all streaming live.
Event types: `AGENT_START`, `TOKEN`, `REASONING`, `TOOL_START`, `TOOL_END`, `INTERRUPT`, `NODE_START`, `NODE_STOP`, `HANDOFF`, `COMPLETE`, `MULTIAGENT_START`, `MULTIAGENT_COMPLETE`, `ERROR`, `SESSION_START`, `SESSION_END` — each carrying `{type, agent_name, timestamp, data}`. Enough for a real-time frontend, a log aggregator, or a debugging dashboard. The `AnsiRenderer` gives you coloured terminal output out of the box — agent names, tool calls, reasoning traces, all streaming live.

---

Expand Down
39 changes: 37 additions & 2 deletions docs/configuration/Chapter_15.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ asyncio.run(main())

## Event Types

Every event is a `StreamEvent` dataclass with four fields: `type`, `agent_name`, `timestamp`, and `data`.

### Session lifecycle events

These two events bracket every invocation. They are produced by the queue layer, not by individual agents.

| Event Type | Description | `data` payload |
|------------|-------------|----------------|
| `SESSION_START` | First event on the queue — emitted before any agent activity | Serialised `SessionManifest`: agents, orchestrations, entry point, model info, session manager locations |
| `SESSION_END` | Last typed event before the stream closes | `{"session_id": "<id or null>"}` |

The `SESSION_START` payload is the full wired topology at invocation time. Use it to restore conversation history, render an architecture diagram, or audit which models are in use — before any agent has run.

### Per-agent events

| Event Type | Description |
|------------|-------------|
| `AGENT_START` | Agent begins processing |
Expand All @@ -42,6 +57,11 @@ asyncio.run(main())
| `INTERRUPT` | Agent pauses for human input |
| `COMPLETE` | Agent finishes (with usage metrics) |
| `ERROR` | Model or execution error |

### Multi-agent events

| Event Type | Description |
|------------|-------------|
| `NODE_START` | Graph/swarm node begins |
| `NODE_STOP` | Graph/swarm node completes |
| `HANDOFF` | Swarm agent hands off to another |
Expand All @@ -62,15 +82,30 @@ while (event := await queue.get()) is not None:
# Send to websocket, log to file, push to metrics system...
```

A typical consumer pattern that handles the session lifecycle:

```python
while (event := await queue.get()) is not None:
if event.type == "session_start":
manifest = event.data # full topology snapshot
entry = manifest["entry"] # {"name": "...", "kind": "agent|orchestration"}
elif event.type == "session_end":
session_id = event.data.get("session_id")
else:
# per-agent or multi-agent event
process(event)
```

## Configuring the Queue in YAML

Event streaming is configured in Python, not YAML — it's a runtime concern. But the **hooks** it installs (`EventPublisher`) listen to the same lifecycle events as your YAML-defined hooks. They coexist peacefully.

> **Tips & Tricks**
>
> - Call `wire_event_queue()` only **once** per `ResolvedConfig` — it mutates the agents by adding hooks.
> - Call `queue.flush()` between requests to clear stale events from a previous invocation.
> - Call `wire_event_queue()` only **once** per `ResolvedConfig` — it mutates agents and orchestrators by adding hooks. Calling it twice would double-attach publishers.
> - Call `queue.flush()` between requests to clear stale events from a previous invocation. This also resets the `SESSION_START` / `SESSION_END` guards so the next cycle can re-emit them.
> - The queue has a max size of 10,000. If your agent generates more events than the consumer processes, events are dropped with a warning.
> - `SESSION_START` is emitted synchronously by `wire_event_queue()` before any agent runs. `SESSION_END` is emitted by `queue.close()` — always call it in a `finally` block.

---

Expand Down
43 changes: 27 additions & 16 deletions examples/12_streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## What this shows

- `wire_event_queue()` — wire all agents to a single async queue that emits `StreamEvent`s
- `wire_event_queue()` — wire all agents and orchestrators to a single async queue that emits `StreamEvent`s
- `AnsiRenderer` — built-in terminal renderer that prints events with colours as they arrive
- How strands-compose turns agent lifecycle events into a consumable stream — the same
mechanism that powers SSE endpoints, WebSocket feeds, and audit logs
Expand All @@ -18,9 +18,9 @@ resolved = load("config.yaml")
queue = resolved.wire_event_queue()
```

`resolved.wire_event_queue()` installs an `EventPublisher` hook on every agent. As the agent runs,
the hook converts lifecycle events (tokens, tool calls, completions) into `StreamEvent`
objects and pushes them to the queue. Your consumer loop is simple:
`resolved.wire_event_queue()` installs an `EventPublisher` hook on every agent and orchestrator.
As the session runs, hooks convert lifecycle events (tokens, tool calls, completions) into
`StreamEvent` objects and push them to the queue. Your consumer loop is simple:

```python
renderer = AnsiRenderer()
Expand All @@ -31,17 +31,25 @@ renderer.flush()

### Event types

| Type | When it fires |
|------|---------------|
| `agent_start` | Agent begins processing |
| `token` | Streaming text chunk |
| `reasoning` | Streaming reasoning chunk |
| `tool_start` | Tool call begins |
| `tool_end` | Tool call finished |
| `interrupt` | Agent pauses for human input |
| `complete` | Agent finished (includes token usage) |
| `node_start` / `node_stop` | Swarm / Graph enters/leaves a node |
| `handoff` | Swarm transfers control |
Every invocation produces a `SESSION_START` as the first event and `SESSION_END` as the last,
bracketing all per-agent activity.

| Type | When it fires | `data` |
|------|---------------|--------|
| `session_start` | Before any agent runs — first event on the queue | Serialised `SessionManifest` (agents, orchestrations, entry, model info) |
| `agent_start` | Agent begins processing | — |
| `token` | Streaming text chunk | `{"text": "..."}` |
| `reasoning` | Streaming reasoning chunk | `{"text": "..."}` |
| `tool_start` | Tool call begins | tool name, input |
| `tool_end` | Tool call finished | tool name, status, result |
| `interrupt` | Agent pauses for human input | interrupt id, reason |
| `complete` | Agent finished (includes token usage) | usage metrics |
| `error` | Model or execution error | exception type, message |
| `node_start` / `node_stop` | Swarm / Graph enters/leaves a node | node id |
| `handoff` | Swarm transfers control | from/to node ids |
| `multiagent_start` | Multi-agent orchestration begins | — |
| `multiagent_complete` | Multi-agent orchestration completes | — |
| `session_end` | After all agent events — last typed event | `{"session_id": "<id or null>"}` |

## Good to know

Expand All @@ -53,7 +61,10 @@ consume the queue and convert events to SSE chunks (see `OpenAIStreamConverter`)
NDJSON (`RawStreamConverter`).

**`queue.flush()`** resets the queue between turns so events from one invocation
don't leak into the next.
don't leak into the next. It also resets the `session_start` / `session_end` guards.

**`queue.close()`** emits `session_end` then signals end-of-stream. Always call it in
a `finally` block so `session_end` is guaranteed even when an exception occurs.

## Prerequisites

Expand Down
2 changes: 0 additions & 2 deletions examples/12_streaming/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

async def _stream(prompt: str, entry, queue):
"""Invoke the entry agent and render the event stream."""
queue.flush()
result = None

async def _invoke() -> None:
Expand All @@ -46,7 +45,6 @@ async def _main() -> None:
resolved = load(CONFIG)
entry = resolved.entry
queue = resolved.wire_event_queue()

print(f"\n{52 * '-'}")
print(f"Try: {STARTER}\n")
print("researcher -> analyst -> coordinator (with live streaming)")
Expand Down
32 changes: 26 additions & 6 deletions src/strands_compose/config/resolvers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from ...manifest import build_manifest, first_session_id
from ...mcp.lifecycle import MCPLifecycle
from ...wire import make_event_queue
from .mcp import resolve_mcp_client, resolve_mcp_server
Expand Down Expand Up @@ -47,9 +48,19 @@ def wire_event_queue(
) -> EventQueue:
"""Wire all agents and orchestrators for event streaming.

This is the recommended way to set up event streaming. It calls
:func:`~strands_compose.wire.make_event_queue` with this
config's agents and orchestrators.
This is the recommended way to set up event streaming. It:

1. Builds a :class:`~strands_compose.types.SessionManifest` from the
resolved runtime objects.
2. Wires every agent (and orchestrator) with an
:class:`~strands_compose.hooks.EventPublisher` via
:func:`~strands_compose.wire.make_event_queue`.
3. Emits a SESSION_START event carrying the manifest as the first
event on the queue.

The effective session id is the first non-``None`` ``session_id``
found in the manifest (agents first, then orchestrations); it is
included in the SESSION_END event payload.

.. warning::

Expand All @@ -58,16 +69,25 @@ def wire_event_queue(
Call it only once per ``ResolvedConfig`` instance.

Args:
tool_labels: Optional tool name -> display label mapping.
tool_labels: Optional tool name display label mapping.

Returns:
A ready-to-use :class:`~strands_compose.wire.EventQueue`.
A ready-to-use :class:`~strands_compose.wire.EventQueue` with
SESSION_START already on it.

Raises:
ValueError: If the entry node cannot be resolved by object identity.
"""
return make_event_queue(
manifest = build_manifest(self.agents, self.orchestrators, self.entry)
event_queue = make_event_queue(
self.agents,
orchestrators=self.orchestrators,
tool_labels=tool_labels,
entry_name=manifest.entry.name,
session_id=first_session_id(manifest),
)
event_queue.emit_session_start(manifest)
return event_queue


@dataclass
Expand Down
Loading
Loading