diff --git a/AGENTS.md b/AGENTS.md index d20d029..9030ff2 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,10 +7,10 @@ Guide for AI coding assistants working in this repository. Read this before maki ## Build and test ```bash +./build.sh # full build + test + bin output (Linux/macOS) +.\build.ps1 # full build + test + bin output (Windows) dotnet build # build only -dotnet test # build + run all tests (323 tests, ~1s) -./build.sh # full build + bin output (Linux/macOS) -.\build.ps1 # full build + bin output (Windows) +dotnet test # build + run all tests (681 tests, ~1s) ``` All tests must pass before committing. There are no integration tests that require a live LLM — everything is unit-testable with fakes. @@ -154,6 +154,23 @@ Validators must not call LLMs or external services. Violations collapse the dete --- +## Context shaping + +Two mechanisms reduce lost-in-the-middle effects for long agent contexts: + +**Task Reminder** (`ContextAssembler`): When the assembled context exceeds 2 000 characters and the task string is longer than 50 characters, `ContextAssembler.AssembleAsync` appends a `[Task Reminder]` `ChatRole.User` message (up to 200 chars of the task) at the recency end of the context list. This exploits the primacy+recency sandwich — the task appears both at the top (system prompt) and at the bottom (reminder). + +**Context Manifest** (`ToolResultWindowTrimmer` + `AgentOrchestrator`): When `MaxToolResultTokens` is exceeded, `ToolResultWindowTrimmer.ApplyWithManifest` tombstones old results and returns a manifest string listing active vs. superseded tool results. `AgentOrchestrator` appends this manifest as a final `ChatRole.User` message so the agent knows which reads are still in context and which must be re-issued with targeted ranges. + +Tombstones now include the evicted tool's name, a key argument label, and up to 300 characters of the original content as a preview: +``` +[tool result — evicted: read_file(src/Foo.cs). Preview: "using System;…". Re-read with targeted ranges if needed.] +``` + +`ToolResultWindowTrimmer.Apply` is still the zero-manifest entry point used by callers that don't need the manifest. Both delegate to the private `ApplyCore`. + +--- + ## Shared history invariant The system maintains two views of history: @@ -236,5 +253,7 @@ When adding a new `FailureAction` or `FailureType` value, update: | How does AgentFile loading work? | `src/Cli/OrchestratorBuilder.cs` → `ResolveAgentFiles` | | How does compaction work? | `src/Orchestration/ConversationCompactor.cs` | | How does change tracking work? | `src/Orchestration/ChangeTracker.cs` | +| How is agent context assembled? | `src/Orchestration/ContextAssemblyPipeline.cs` (main entry point, stages 1–6); `src/Orchestration/ContextAssembler.cs` (per-agent assembled contexts) | +| How are tool results trimmed / tombstoned? | `src/Orchestration/ToolResultWindowTrimmer.cs` | | Full architecture decisions | `docs/design.md` | | Hardening configs against hallucination | `docs/harness-engineering.md` | diff --git a/config/examples/cross-system-flow-analyzer.yaml b/config/examples/cross-system-flow-analyzer.yaml index 41e1751..66c8759 100644 --- a/config/examples/cross-system-flow-analyzer.yaml +++ b/config/examples/cross-system-flow-analyzer.yaml @@ -69,7 +69,7 @@ Orchestration: Models: heavy: - ModelId: claude-opus-4-7 + ModelId: claude-opus-4-8 scout: ModelId: claude-haiku-4-5-20251001 @@ -94,12 +94,15 @@ Orchestration: CutoverAt: 110000 # compact early; artifact handoffs eliminate the need for large live contexts FailureHandling: - RetryCount: 2 - OnAgentFailure: - - checkpoint - - compact - - retry - - escalate + MissingEvidence: + Action: Reinstruct + Threshold: 3 + ConflictingEvidence: + Action: Reinstruct + Threshold: 2 + NoProgress: + Action: Abort + Threshold: 3 Events: Path: .fuseraft/logs/events.jsonl diff --git a/config/examples/playwright-mcp.yaml b/config/examples/playwright-mcp.yaml new file mode 100644 index 0000000..e409812 --- /dev/null +++ b/config/examples/playwright-mcp.yaml @@ -0,0 +1,61 @@ +## Playwright MCP example: a single browser-automation agent backed by the Playwright MCP server. +## Prerequisites: +## 1. Install the correct Chromium build for the MCP server's playwright-core version: +## node $(npx --yes @playwright/mcp@latest node -e "process.exit(0)" 2>/dev/null; \ +## find ~/.npm/_npx -name "cli.js" -path "*/playwright-core/*" | head -1) install chromium +## Or more simply, find the cli.js path and run: node install chromium +## 2. Ensure XAI_API_KEY is set in your environment. +## Run: fuseraft run --config config/examples/orchestration.yaml "Navigate to https://example.com and take a screenshot" +## Validate: fuseraft validate config/examples/orchestration.yaml + +Orchestration: + Name: PlaywrightExample + Description: >- + Single-agent setup that drives a browser via the Playwright MCP server. + The agent can navigate pages, click elements, fill forms, and capture screenshots. + + McpServers: + - Name: playwright + Transport: stdio + Command: npx + Args: + - "@playwright/mcp@latest" + - "--browser" + - "chromium" # must match the browser installed via playwright-core's cli.js + + Agents: + - Name: BrowserAgent + Description: Automates browser interactions using Playwright tools. + Instructions: | + You are a browser automation agent with access to Playwright tools. + + Use the playwright MCP tools to complete the requested task: + - Navigate to URLs with browser_navigate + - Click elements with browser_click + - Fill forms with browser_fill + - Take screenshots with browser_screenshot + - Read page content with browser_snapshot + + Be concise. Report what you did and what you observed. + Model: + ModelId: grok-4.3 + Endpoint: https://api.x.ai/v1 + ApiKeyEnvVar: XAI_API_KEY + MaxTokens: 4096 + Plugins: + - playwright + + Selection: + Type: roundrobin + + Termination: + Type: composite + MaxIterations: 10 + Strategies: + - Type: regex + Pattern: "(?i)\\bdone\\b" + AgentNames: + - BrowserAgent + + Events: + Path: .fuseraft/events.jsonl diff --git a/docs/context-management.md b/docs/context-management.md index 3548d72..02f0a43 100644 --- a/docs/context-management.md +++ b/docs/context-management.md @@ -18,6 +18,7 @@ Each agent turn — ContextAssemblyPipeline (always on) └─ Context window filter → per-agent history slice (ContextWindow config) └─ Session context injection → session summary prepended (if present) └─ Artifact offloading → tool results > 40k chars stored to disk; stub replaces inline (always on) + └─ Task Reminder → task repeated at recency end when context > 2 000 chars (primacy+recency sandwich) History too long └─ Compaction → replace old turns with a summary + tool-call trace @@ -582,12 +583,14 @@ ContextBudget: InTurnToolWindow: 20 # always retain at least the last 20 results verbatim ``` -When the cumulative estimated token cost of all tool-result messages in the context slice exceeds `MaxToolResultTokens`, the oldest results beyond the last `InTurnToolWindow` are replaced with one-line tombstones of the form: +When the cumulative estimated token cost of all tool-result messages in the context slice exceeds `MaxToolResultTokens`, the oldest results beyond the last `InTurnToolWindow` are replaced with enriched tombstones that include the tool name, a key argument label, and up to 300 characters of the original content as a preview: ``` -[tool result — evicted after tool window exceeded] +[tool result — evicted: read_file(src/LargeService.cs). Preview: "using System;…". Re-read with targeted ranges if needed.] ``` +When evictions occur, a `[Context Manifest]` message is also appended at the end of the context slice listing active tool results still in context alongside the superseded (evicted) ones, so the agent knows which reads are still available and which must be re-issued with targeted ranges. + **Key difference from `MaxInTurnToolPairs`:** `MaxInTurnToolPairs` is an agent-level count-based cap applied unconditionally before every inner LLM call. `MaxToolResultTokens` is a session-level token-budget cap applied at the `ContextBudget` layer — it only fires when the total tool-result token footprint actually exceeds the threshold, preserving full context for turns with few or small results. **Audit trail:** the full tool results remain in the shared conversation history and on-disk artifacts. Only the slice passed to the model is trimmed — compaction and session replay are unaffected. @@ -743,12 +746,13 @@ Here is the full sequence from session start through a long-running session: │ └─ SanitizeToolPairs — strip orphaned assistant tool-call frames (strict providers) ├─ Session context injection → context_summary.md prepended when present ├─ Knowledge artifact appended as [Pipeline Knowledge] user message + ├─ Task Reminder appended when context > 2 000 chars — primacy+recency sandwich reduces lost-in-the-middle drift └─ Assembled context → sent to LLM ├─ Session read cache — read_file returns hint instead of full content if file unchanged since last read/write this session ├─ Tool-result artifact offloading — results > 40k chars stored to disk; stub replaces inline content ├─ MaxInTurnToolPairs — sliding window: keep only last N tool pairs per inner call ├─ MaxInTurnContextTokens — budget-reactive: trim oldest pairs when over budget - ├─ MaxToolResultTokens / InTurnToolWindow — tombstone oldest tool results beyond token budget + ├─ MaxToolResultTokens / InTurnToolWindow — tombstone oldest results with label+preview; append [Context Manifest] when evictions occur └─ On context/413 error → adaptive trim retry (up to 3 stages) Post-turn diff --git a/docs/sessions.md b/docs/sessions.md index 157dab2..6f449ce 100644 --- a/docs/sessions.md +++ b/docs/sessions.md @@ -139,6 +139,97 @@ All REPL events are tagged with the session ID (`session` field in the JSONL), s --- +**Orchestration event types** emitted to `events.jsonl` by `fuseraft run`: + +*Session / turn lifecycle* + +| Event type | When emitted | +|------------|-------------| +| `session_start` | Session begins | +| `session_end` | Session completes successfully | +| `session_error` | Unrecoverable session error | +| `session_recovered` | Session resumed from a prior checkpoint | +| `session_aborted` | Session stopped before completion | +| `session_summary` | Post-run summary written | +| `turn_start` | Agent turn begins | +| `turn_end` | Agent turn completes | +| `turn_timeout` | Agent turn exceeded its time limit | + +*Checkpointing / resume* + +| Event type | When emitted | +|------------|-------------| +| `checkpoint_created` | Seed checkpoint written for a new session | +| `checkpoint_loaded` | Existing checkpoint loaded for a resume | +| `resume_started` | Resumed session is about to begin streaming | +| `resume_completed` | Resumed session ran to successful completion | +| `event_replay_start` | Prior message history is being replayed as context | +| `event_replay_complete` | Message history replay finished | +| `event_corruption_detected` | A session file failed to deserialise — payload: `session`, `source`, `error` | + +*Agent execution* + +| Event type | When emitted | +|------------|-------------| +| `agent_start` | Individual agent begins its turn | +| `agent_end` | Individual agent turn completes | +| `agent_error` | Agent threw an unhandled error | +| `agent_timeout` | Agent exceeded its time limit | +| `agent_routed` | Routing selected the next agent | +| `agent_blocked` | Agent declared an unrecoverable blocker | + +*Model invocation* + +| Event type | When emitted | Key payload fields | +|------------|-------------|-------------------| +| `model_call` | LLM HTTP request is about to be sent — payload: `model`, `attempt`, `message_count`, `call_seq` | correlates with `inner_call_context` via `call_seq` | +| `model_response` | LLM response received — payload: `model`, `finish_reason`, `input_tokens`, `output_tokens`, `call_seq` | | +| `model_error` | LLM call failed (non-timeout) — payload: `model`, `attempt`, `call_seq`, `error` | includes context-limit exhaustion | +| `model_timeout` | LLM call or streaming response timed out — payload: `model`, `attempt`, `message` | | + +*Tool use* + +| Event type | When emitted | +|------------|-------------| +| `tool_call` | Tool invoked by an agent | +| `tool_result` | Tool result returned | +| `tool_blocked` | Tool call denied by governance | +| `tool_error` | Tool threw an exception | +| `tool_timeout` | Tool execution timed out | + +*Validation / governance* + +| Event type | When emitted | +|------------|-------------| +| `validation_fail` | Validator rejected an agent response | +| `hitl_escalation` | Human-in-the-loop intervention required | +| `hitl_approved` | HITL operator approved continuation | +| `hitl_rejected` | HITL operator rejected continuation | +| `circuit_breaker_open` | Circuit breaker tripped on consecutive LLM failures | +| `retry_scheduled` | Retry attempt queued after a recoverable failure | +| `retry_exhausted` | All retry attempts consumed | +| `max_turns_exceeded` | Session hit the `MaxIterations` cap | +| `termination_satisfied` | Termination condition met naturally | +| `termination_forced` | Session forcibly stopped (budget, cap, etc.) | + +*Cancellation* + +| Event type | When emitted | +|------------|-------------| +| `cancellation_requested` | `OperationCanceledException` caught mid-turn (Ctrl+C during streaming) | +| `cancellation_observed` | Cancellation token checked between turns and loop is stopping cleanly | + +*Compaction* + +| Event type | When emitted | +|------------|-------------| +| `compaction` | Compaction applied to reduce history size | +| `compaction_resume_candidate` | Session paused to await resume after compaction | + +All orchestration events include `ts` (ISO 8601 timestamp), `session` (8-char hex ID), `agent`, and `turn` fields alongside the `event_type` and `payload`. Use `fuseraft log` to view them in a formatted table. + +--- + ## Orchestration sessions (`fuseraft run`) ## How sessions work diff --git a/src/Cli/Commands/Eval/EvalCommand.cs b/src/Cli/Commands/Eval/EvalCommand.cs index ff92ca6..527e3bd 100644 --- a/src/Cli/Commands/Eval/EvalCommand.cs +++ b/src/Cli/Commands/Eval/EvalCommand.cs @@ -247,7 +247,7 @@ internal static EvalCaseResult Score(EvalCase evalCase, SessionResult result, st failures.Add($"session did not succeed: {result.ErrorMessage ?? "unknown"}"); var finalContent = result.Messages - .LastOrDefault(m => m.Role == "assistant")?.Content ?? string.Empty; + .LastOrDefault(m => m.Role == MessageRole.Assistant)?.Content ?? string.Empty; foreach (var kw in evalCase.ExpectKeywords) if (!finalContent.Contains(kw, StringComparison.OrdinalIgnoreCase)) diff --git a/src/Cli/Commands/Log/EventLogViewer.cs b/src/Cli/Commands/Log/EventLogViewer.cs index aa64846..49efd9c 100644 --- a/src/Cli/Commands/Log/EventLogViewer.cs +++ b/src/Cli/Commands/Log/EventLogViewer.cs @@ -1,6 +1,7 @@ using System.Text.Json; using System.Text.Json.Serialization; using Spectre.Console; +using fuseraft.Orchestration; namespace fuseraft.Cli.Commands.Log; @@ -123,18 +124,18 @@ internal static async Task RenderAsync( private static string ColorizeEvent(string eventType) => eventType switch { - "session_start" => "[cyan]session_start[/]", - "session_end" => "[cyan]session_end[/]", - "session_error" => "[red]session_error[/]", - "circuit_breaker_open" => "[red]circuit_breaker_open[/]", - "tool_blocked" => "[yellow]tool_blocked[/]", - "validation_fail" => "[yellow]validation_fail[/]", - "hitl_escalation" => "[yellow]hitl_escalation[/]", - "skill_curation_complete" => "[green]skill_curation_complete[/]", - "skill_curation_start" => "[dim]skill_curation_start[/]", - "turn_start" or "turn_end" => $"[dim]{Markup.Escape(eventType)}[/]", - "command" => "[dim]command[/]", - _ => Markup.Escape(eventType), + EventTypes.SessionStart => $"[cyan]{EventTypes.SessionStart}[/]", + EventTypes.SessionEnd => $"[cyan]{EventTypes.SessionEnd}[/]", + EventTypes.SessionError => $"[red]{EventTypes.SessionError}[/]", + EventTypes.CircuitBreakerOpen => $"[red]{EventTypes.CircuitBreakerOpen}[/]", + EventTypes.ToolBlocked => $"[yellow]{EventTypes.ToolBlocked}[/]", + EventTypes.ValidationFail => $"[yellow]{EventTypes.ValidationFail}[/]", + EventTypes.HitlEscalation => $"[yellow]{EventTypes.HitlEscalation}[/]", + EventTypes.SkillCurationComplete => $"[green]{EventTypes.SkillCurationComplete}[/]", + EventTypes.SkillCurationStart => $"[dim]{EventTypes.SkillCurationStart}[/]", + EventTypes.TurnStart or EventTypes.TurnEnd => $"[dim]{Markup.Escape(eventType)}[/]", + EventTypes.Command => $"[dim]{EventTypes.Command}[/]", + _ => Markup.Escape(eventType), }; private static string SummarizePayload(string? eventType, JsonElement? payload) @@ -145,39 +146,39 @@ private static string SummarizePayload(string? eventType, JsonElement? payload) { return eventType switch { - "command" => + EventTypes.Command => Get(p, "command") is { } cmd ? $"[dim]{Markup.Escape(Truncate(cmd, 60))}[/]" : string.Empty, - "skill_curation_complete" => + EventTypes.SkillCurationComplete => (Get(p, "outcome"), Get(p, "slug")) is ({ } outcome, { } slug) ? $"[dim]{Markup.Escape(outcome)} {Markup.Escape(slug)}[/]" : Get(p, "outcome") is { } o ? $"[dim]{Markup.Escape(o)}[/]" : string.Empty, - "session_error" => + EventTypes.SessionError => Get(p, "error") is { } err ? $"[dim red]{Markup.Escape(Truncate(err, 80))}[/]" : string.Empty, - "tool_blocked" => + EventTypes.ToolBlocked => Get(p, "tool") is { } tool ? $"[dim]{Markup.Escape(tool)}[/]" : string.Empty, - "validation_fail" => + EventTypes.ValidationFail => Get(p, "validator") is { } v ? $"[dim]{Markup.Escape(v)}[/]" : string.Empty, - "session_start" => + EventTypes.SessionStart => Get(p, "model") is { } model ? $"[dim]{Markup.Escape(Truncate(model, 30))}[/]" : string.Empty, - "turn_end" => + EventTypes.TurnEnd => Get(p, "agent") is { } agent ? $"[dim]{Markup.Escape(agent)}[/]" : string.Empty, diff --git a/src/Cli/Commands/Repl/ReplCommand.cs b/src/Cli/Commands/Repl/ReplCommand.cs index 0aba1c6..4ed1a07 100644 --- a/src/Cli/Commands/Repl/ReplCommand.cs +++ b/src/Cli/Commands/Repl/ReplCommand.cs @@ -199,23 +199,23 @@ protected override async Task ExecuteAsync( toolsByCategory["Session"] = PluginRegistry.GetFunctionsFromObject(replSessionPlugin).ToList(); } + using var emitter = new EventEmitter(eventsPath); + emitter.SetSessionId(sessionId); + // Wrap every tool category with the artifact offload filter so oversized results are // stored to disk instead of accumulating verbatim in the conversation history. var toolArtifactsDir = Path.Combine(cwd, FuseraftPaths.ExpandSessionId(FuseraftPaths.LocalSessionToolArtifacts, sessionId)); - var toolArtifactStore = new ToolResultArtifactStore(toolArtifactsDir); + var toolArtifactStore = new ToolResultArtifactStore(toolArtifactsDir, emitter); foreach (var key in toolsByCategory.Keys.ToList()) toolsByCategory[key] = toolsByCategory[key] .Select(f => (AIFunction)new ToolResultOffloadFilter(f, toolArtifactStore)) .ToList(); - using var emitter = new EventEmitter(eventsPath); - emitter.SetSessionId(sessionId); - if (explorerTools is not null) subAgent = new SubAgentPlugin(factory.Create(modelConfig), explorerTools, eventEmitter: emitter, parentAgentName: "repl"); - await emitter.EmitAsync("session_start", payload: new + await emitter.EmitAsync(EventTypes.SessionStart, payload: new { model = modelId, cwd, @@ -339,7 +339,7 @@ protected override async Task ExecuteAsync( await ReplTurn.RunAsync(ctx, cancellationToken); - await emitter.EmitAsync("session_end", payload: new { turns = ctx.TurnIndex }); + await emitter.EmitAsync(EventTypes.SessionEnd, payload: new { turns = ctx.TurnIndex }); await ReplTurn.ExtractMemoriesOnExitAsync(ctx); // Post-session skill curation (best-effort — never fails the session). @@ -432,7 +432,7 @@ private static async Task RunSkillCurationAsync( { try { - await ctx.Emitter.EmitAsync("skill_curation_start", + await ctx.Emitter.EmitAsync(EventTypes.SkillCurationStart, payload: new { session = ctx.SessionId, source = "repl" }); // Convert ChatMessage history to AgentMessage list (assistant turns only). @@ -440,7 +440,7 @@ await ctx.Emitter.EmitAsync("skill_curation_start", .Where(m => m.Role == ChatRole.Assistant && !string.IsNullOrWhiteSpace(m.Text)) .Select((m, i) => new AgentMessage { - AgentName = "Assistant", + AgentName = AgentNames.Assistant, Content = m.Text!, Role = "assistant", TurnIndex = i, @@ -473,7 +473,7 @@ await ctx.Emitter.EmitAsync("skill_curation_start", var result = await curator.RunAsync(checkpoint, messages, CancellationToken.None, source: "repl"); - await ctx.Emitter.EmitAsync("skill_curation_complete", + await ctx.Emitter.EmitAsync(EventTypes.SkillCurationComplete, payload: new { session = ctx.SessionId, @@ -501,7 +501,7 @@ await ctx.Emitter.EmitAsync("skill_curation_complete", // Curation is best-effort — log but never surface as an error. try { - await ctx.Emitter.EmitAsync("skill_curation_complete", + await ctx.Emitter.EmitAsync(EventTypes.SkillCurationComplete, payload: new { session = ctx.SessionId, source = "repl", outcome = "failed", failure_reason = ex.Message }); } catch (Exception emitEx) { loggerFactory.CreateLogger().LogWarning(emitEx, "[SkillCuration] emitter failed: {Message}", emitEx.Message); } diff --git a/src/Cli/Commands/Repl/ReplCommands.cs b/src/Cli/Commands/Repl/ReplCommands.cs index a93c9e8..4f54632 100644 --- a/src/Cli/Commands/Repl/ReplCommands.cs +++ b/src/Cli/Commands/Repl/ReplCommands.cs @@ -7,6 +7,7 @@ using fuseraft.Core; using fuseraft.Core.Models; using fuseraft.Infrastructure; +using fuseraft.Orchestration; namespace fuseraft.Cli.Commands.Repl; @@ -73,7 +74,7 @@ private static async Task CmdClearAsync(ReplSessionContext ctx) ctx.ContextWarningShown = false; ctx.ResetPlanState(); AnsiConsole.MarkupLine("[dim]History cleared.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/clear" }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/clear" }); return CommandResult.Continue; } @@ -92,7 +93,7 @@ private static CommandResult CmdSystem(ReplSessionContext ctx, string arg) ctx.History.RemoveAll(m => m.Role == ChatRole.System); ctx.History.Insert(0, new ChatMessage(ChatRole.System, updated)); AnsiConsole.MarkupLine("[dim]System prompt updated.[/]"); - _ = ctx.Emitter.EmitAsync("command", payload: new { command = "/system", prompt = arg }); + _ = ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/system", prompt = arg }); } return CommandResult.Continue; } @@ -144,14 +145,14 @@ private static async Task CmdToolsAsync(ReplSessionContext ctx, s // from ChatOptions at call time, so Client/StepClient don't need rebuilding. ctx.ChatOptions = ctx.BuildChatOptions(); AnsiConsole.MarkupLine($"[dim]{Markup.Escape(match)} tools disabled.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/tools disable", category = match }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/tools disable", category = match }); } else { ctx.DisabledCategories.Remove(match); ctx.ChatOptions = ctx.BuildChatOptions(); AnsiConsole.MarkupLine($"[dim]{Markup.Escape(match)} tools enabled.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/tools enable", category = match }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/tools enable", category = match }); } } else @@ -198,7 +199,7 @@ private static async Task CmdSaveAsync(ReplSessionContext ctx, st : arg; SaveTranscript(ctx.History, ctx.ModelId, path); AnsiConsole.MarkupLine($"[dim]Transcript saved to[/] {Markup.Escape(path)}"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/save", path }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/save", path }); return CommandResult.Continue; } @@ -279,7 +280,7 @@ private static async Task CmdContextAsync(ReplSessionContext ctx) } Console.Write(sb.ToString()); ctx.PrevCtxEstimate = total; - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/context", estimated_tokens = total, @@ -329,7 +330,7 @@ private static async Task CmdContextAsync(ReplSessionContext ctx) } ctx.PrevCtxEstimate = total; - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/context", estimated_tokens = total, @@ -399,7 +400,7 @@ private static async Task CmdProviderAsync(ReplSessionContext ctx AnsiConsole.MarkupLine($"[dim]Settings saved to[/] [bold]{Markup.Escape(UserConfigStore.ConfigPath)}[/]"); AnsiConsole.MarkupLine($"[dim]API key stored in[/] [bold]{Markup.Escape(ctx.KeyStore.StoreName)}[/]"); AnsiConsole.MarkupLine($"[dim]Model:[/] [bold]{Markup.Escape(ctx.ModelId)}[/] [dim](history cleared)[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/provider setup", model = ctx.ModelId }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/provider setup", model = ctx.ModelId }); return CommandResult.Continue; } @@ -436,7 +437,7 @@ private static async Task CmdPlanAsync(ReplSessionContext ctx, st $"Focus on intentful actions only — no defensive steps like verifying CWD or reading files back." + $"\n\nTask: {arg}"; - await ctx.Emitter.EmitAsync("command", payload: new { command = "/plan", task = arg }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/plan", task = arg }); return CommandResult.Send(planPrompt, capturePlan: true); } @@ -457,7 +458,7 @@ private static async Task CmdExecuteAsync(ReplSessionContext ctx) AnsiConsole.MarkupLine($"[dim]Executing {total}-step plan…[/]"); AnsiConsole.WriteLine(); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/execute", steps = total }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/execute", steps = total }); return CommandResult.Continue; } @@ -586,7 +587,7 @@ private static async Task CmdEventsAsync(ReplSessionContext ctx, string arg) turnSet.Add(tEl.GetInt32()); } - if (et == "tool_call" && + if (et == EventTypes.ToolCall && root.TryGetProperty("payload", out var pl) && pl.TryGetProperty("tool_name", out var tn)) { @@ -637,7 +638,7 @@ private static async Task CmdEventsAsync(ReplSessionContext ctx, string arg) AnsiConsole.MarkupLine($" [dim]·[/] {Markup.Escape(name)} [dim]{cnt}x[/]"); } - await ctx.Emitter.EmitAsync("command", payload: new { command = "/events stats" }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/events stats" }); } private static async Task CmdSafeModeAsync(ReplSessionContext ctx, string arg) @@ -665,7 +666,7 @@ private static async Task CmdSafeModeAsync(ReplSessionContext ctx ctx.ChatOptions = ctx.BuildChatOptions(); ctx.SafeMode = true; AnsiConsole.MarkupLine("[dim]Safe mode[/] [green]on[/][dim]: Shell, Git, Http tools disabled.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/safe-mode on" }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/safe-mode on" }); } } else if (arg.Equals("off", StringComparison.OrdinalIgnoreCase)) @@ -683,7 +684,7 @@ private static async Task CmdSafeModeAsync(ReplSessionContext ctx ctx.ChatOptions = ctx.BuildChatOptions(); ctx.SafeMode = false; AnsiConsole.MarkupLine("[dim]Safe mode[/] [dim]off[/][dim]: tool categories restored.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/safe-mode off" }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/safe-mode off" }); } } else @@ -716,13 +717,13 @@ private static CommandResult CmdAdversarial(ReplSessionContext ctx, string arg) } ctx.AdversarialMode = true; AnsiConsole.MarkupLine("[dim]Adversarial mode[/] [green]on[/][dim]: critic agent will review each /execute step.[/]"); - _ = ctx.Emitter.EmitAsync("command", payload: new { command = "/adversarial on" }); + _ = ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/adversarial on" }); } else if (arg.Equals("off", StringComparison.OrdinalIgnoreCase)) { ctx.AdversarialMode = false; AnsiConsole.MarkupLine("[dim]Adversarial mode[/] [dim]off[/][dim].[/]"); - _ = ctx.Emitter.EmitAsync("command", payload: new { command = "/adversarial off" }); + _ = ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/adversarial off" }); } else { @@ -772,7 +773,7 @@ private static async Task CmdAssistAsync( AnsiConsole.WriteLine(correction); AnsiConsole.WriteLine(); } - await ctx.Emitter.EmitAsync("command", payload: new { command = "/assist" }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/assist" }); return CommandResult.Send(correction); } catch (OperationCanceledException) @@ -842,7 +843,7 @@ private static async Task CmdMemoryAsync( AnsiConsole.MarkupLine(deleted ? $"[dim]Deleted memory '{Markup.Escape(memArg)}'.[/]" : $"[yellow]No memory named '{Markup.Escape(memArg)}'.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/memory delete", name = memArg }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/memory delete", name = memArg }); } } else if (sub == "save") @@ -868,7 +869,7 @@ private static async Task CmdMemoryAsync( ? $"[dim]{saved.Count} memor{(saved.Count == 1 ? "y" : "ies")} saved.[/]" : "[dim]Nothing worth saving found.[/]"); ctx.LastExtractedTurnIndex = ctx.TurnIndex; - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/memory save", saved = saved.Count, parseFailed }); } catch (Exception ex) @@ -924,7 +925,7 @@ private static async Task CmdCompactAsync( ReplJsonBridge.Emit(new { type = "compacted" }); else AnsiConsole.MarkupLine("[dim]Session compacted — history replaced with handoff summary.[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/compact", arg }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/compact", arg }); return CommandResult.Continue; } @@ -976,7 +977,7 @@ private static async Task CmdCompactAsync( ctx.ResetPlanState(); var afterEst = ctx.EstimateTokens(); - await ctx.Emitter.EmitAsync("compaction", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Compaction, payload: new { source = "manual", before_tokens = beforeEst, @@ -1034,7 +1035,7 @@ await ctx.SubAgent.ExploreStreamingAsync(arg, await StopSpinner(); if (headerPrinted) { if (!ctx.JsonMode) AnsiConsole.WriteLine(); } else AnsiConsole.MarkupLine("[dim](no output)[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/explore", query = arg }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/explore", query = arg }); } catch (OperationCanceledException) { @@ -1098,7 +1099,7 @@ await ctx.SubAgent.LocateStreamingAsync(arg, await StopSpinner(); if (gotOutput) { if (!ctx.JsonMode) AnsiConsole.WriteLine(); } else AnsiConsole.MarkupLine("[dim](not found)[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/locate", target = arg }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/locate", target = arg }); } catch (OperationCanceledException) { @@ -1243,7 +1244,7 @@ private static async Task CmdSwitchAsync( $"[yellow] ⚠ Plan halted at step {ctx.HaltedAt.Value.Step.Step} of {ctx.HaltedAt.Value.Total}. Run /recover or /resume.[/]"); } - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/switch", target_id = snapshot.SessionId, @@ -1424,7 +1425,7 @@ private static async Task CmdRewindAsync( : $"[dim]Rewound to after turn {targetTurn} — {removed} turn{(removed == 1 ? "" : "s")} removed.[/]"); } - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/rewind", target = targetTurn, removed, total_was = totalTurns }); return CommandResult.Continue; } @@ -1505,7 +1506,7 @@ private static async Task CmdForkAsync( AnsiConsole.MarkupLine($"[dim]Switched to fork:[/] [bold cyan]{Markup.Escape(forkId)}[/] [dim](was {Markup.Escape(prevId)})[/]"); } - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/fork switch", fork_id = forkId, prev_id = prevId, turns = ctx.TurnIndex }); } else @@ -1525,7 +1526,7 @@ private static async Task CmdForkAsync( AnsiConsole.MarkupLine($"[dim]Or:[/] [bold]/fork switch[/] [dim]to branch and continue as the fork right now.[/]"); } - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/fork", fork_id = forkId, turns = ctx.TurnIndex }); } @@ -1595,7 +1596,7 @@ private static async Task CmdModelAsync(ReplSessionContext ctx, s AnsiConsole.MarkupLine( $"[dim]Model:[/] [bold]{Markup.Escape(prevModel)}[/] [dim]→[/] [bold]{Markup.Escape(newModelId)}[/]{effortSuffix} " + $"[dim](history preserved)[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/model", model = newModelId, prev = prevModel, reasoning_effort = newEffort }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/model", model = newModelId, prev = prevModel, reasoning_effort = newEffort }); return CommandResult.Continue; } @@ -1641,7 +1642,7 @@ private static async Task CmdReasoningAsync(ReplSessionContext ct var prevDisplay = prev ?? "(none)"; AnsiConsole.MarkupLine($"[dim]Reasoning:[/] [bold]{Markup.Escape(prevDisplay)}[/] [dim]→[/] [bold]{Markup.Escape(effort)}[/]"); - await ctx.Emitter.EmitAsync("command", payload: new { command = "/reasoning", reasoning_effort = effort, prev = prevDisplay, model = ctx.ModelId }); + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/reasoning", reasoning_effort = effort, prev = prevDisplay, model = ctx.ModelId }); return CommandResult.Continue; } @@ -1667,7 +1668,7 @@ private static CommandResult CmdRetry(ReplSessionContext ctx) else AnsiConsole.MarkupLine("[dim]Retrying last message…[/]"); - _ = ctx.Emitter.EmitAsync("command", payload: new { command = "/retry" }); + _ = ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/retry" }); return CommandResult.Send(lastUserText); } @@ -2024,7 +2025,7 @@ private static async Task CmdRunAsync( InjectRunContext(ctx, task, configPath, succeeded, exitCode, sw.Elapsed, output); - await ctx.Emitter.EmitAsync("command", payload: new + await ctx.Emitter.EmitAsync(EventTypes.Command, payload: new { command = "/run", config = configPath, diff --git a/src/Cli/Commands/Repl/ReplTurn.cs b/src/Cli/Commands/Repl/ReplTurn.cs index 9f2be1f..d093324 100644 --- a/src/Cli/Commands/Repl/ReplTurn.cs +++ b/src/Cli/Commands/Repl/ReplTurn.cs @@ -6,6 +6,7 @@ using fuseraft.Cli.Display; using fuseraft.Core.Models; using fuseraft.Infrastructure; +using fuseraft.Orchestration; namespace fuseraft.Cli.Commands.Repl; @@ -260,9 +261,9 @@ internal static async Task ExecuteAsync( bool isCorrectionTurn = false) { ctx.Emitter.SetTurn(ctx.TurnIndex); - await ctx.Emitter.EmitAsync("user_input", turn: ctx.TurnIndex, payload: new { content = input }); + await ctx.Emitter.EmitAsync(EventTypes.UserInput, turn: ctx.TurnIndex, payload: new { content = input }); ctx.History.Add(new ChatMessage(ChatRole.User, input)); - await ctx.Emitter.EmitAsync("turn_start", turn: ctx.TurnIndex, payload: new { is_step = isStepRequest, is_correction = isCorrectionTurn }); + await ctx.Emitter.EmitAsync(EventTypes.TurnStart, turn: ctx.TurnIndex, payload: new { is_step = isStepRequest, is_correction = isCorrectionTurn }); // Preserve the user's input before the LLM call so a crash mid-turn still // leaves a recoverable snapshot with the typed text. @@ -382,7 +383,7 @@ async Task StopSpinnerAsync() { await StopSpinnerAsync(); spinCts.Dispose(); - await ctx.Emitter.EmitAsync("cancelled", turn: ctx.TurnIndex); + await ctx.Emitter.EmitAsync(EventTypes.Cancelled, turn: ctx.TurnIndex); if (ctx.JsonMode) ReplJsonBridge.Emit(new { type = "cancelled" }); else @@ -402,7 +403,7 @@ async Task StopSpinnerAsync() await StopSpinnerAsync(); spinCts.Dispose(); - await ctx.Emitter.EmitAsync("repl_error", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.ReplError, turn: ctx.TurnIndex, payload: new { exception_type = ex.GetType().Name, message = ex.Message, @@ -438,7 +439,7 @@ async Task StopSpinnerAsync() await StopSpinnerAsync(); spinCts.Dispose(); - await ctx.Emitter.EmitAsync("repl_error", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.ReplError, turn: ctx.TurnIndex, payload: new { exception_type = ex.GetType().Name, message = ex.Message, @@ -486,7 +487,7 @@ async Task StopSpinnerAsync() else AnsiConsole.MarkupLine("[dim] ↯ empty response — the model returned no content. Try again.[/]"); - await ctx.Emitter.EmitAsync("repl_warning", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.ReplWarning, turn: ctx.TurnIndex, payload: new { message = "empty_response", }); @@ -509,7 +510,7 @@ async Task StopSpinnerAsync() { if (!isCorrectionTurn) { - await ctx.Emitter.EmitAsync("correction_injected", turn: ctx.TurnIndex, payload: new { reason = "mutation_claimed_without_write_tool" }); + await ctx.Emitter.EmitAsync(EventTypes.CorrectionInjected, turn: ctx.TurnIndex, payload: new { reason = "mutation_claimed_without_write_tool" }); if (!ctx.JsonMode) AnsiConsole.MarkupLine("[dim] ↺ mutation claimed without write tool — injecting correction[/]"); const string correctionMsg = @@ -560,7 +561,7 @@ await ExecuteAsync( if (pct >= 0.75) { ctx.ContextWarningShown = true; - await ctx.Emitter.EmitAsync("context_warning", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.ContextWarning, turn: ctx.TurnIndex, payload: new { estimated_tokens = postEst, budget = ContextTokenBudget, @@ -590,9 +591,9 @@ await ExecuteAsync( $"[dim] tokens (est.): {postEst:N0} / {ContextTokenBudget:N0} rounds: {toolRounds} tool calls: {toolCallsThisTurn.Count}[/]"); foreach (var (name, args) in toolCallDetails) - await ctx.Emitter.EmitAsync("tool_call", turn: ctx.TurnIndex, payload: new { tool_name = name, args }); - await ctx.Emitter.EmitAsync("assistant_response", turn: ctx.TurnIndex, payload: new { content = responseText }); - await ctx.Emitter.EmitAsync("turn_end", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.ToolCall, turn: ctx.TurnIndex, payload: new { tool_name = name, args }); + await ctx.Emitter.EmitAsync(EventTypes.AssistantResponse, turn: ctx.TurnIndex, payload: new { content = responseText }); + await ctx.Emitter.EmitAsync(EventTypes.TurnEnd, turn: ctx.TurnIndex, payload: new { elapsed_ms = (int)(DateTime.UtcNow - turnStart).TotalMilliseconds, estimated_tokens = postEst, @@ -633,7 +634,7 @@ internal static void HandlePlanCapture(ReplSessionContext ctx, string responseTe if (TryParsePlan(responseText, out var steps) && steps.Length > 0) { ctx.CurrentPlan = steps; - _ = ctx.Emitter.EmitAsync("plan_captured", turn: ctx.TurnIndex, payload: new { step_count = steps.Length }); + _ = ctx.Emitter.EmitAsync(EventTypes.PlanCaptured, turn: ctx.TurnIndex, payload: new { step_count = steps.Length }); if (ctx.JsonMode) { ReplJsonBridge.Emit(new { type = "plan", steps }); @@ -693,7 +694,7 @@ internal static async Task HandleStepResult( var inspectSkip = activeStep.Tool is not null && toolCallsThisTurn.Count > 0 && toolCallsThisTurn.All(t => InspectTools.Contains(t)); var skipped = zeroCallSkip || inspectSkip; - await ctx.Emitter.EmitAsync("step_complete", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.StepComplete, turn: ctx.TurnIndex, payload: new { step = activeStep.Step, total, @@ -722,7 +723,7 @@ internal static async Task HandleStepResult( } else { - await ctx.Emitter.EmitAsync("step_halted", turn: ctx.TurnIndex, payload: new + await ctx.Emitter.EmitAsync(EventTypes.StepHalted, turn: ctx.TurnIndex, payload: new { step = activeStep.Step, total, diff --git a/src/Cli/Commands/RunCommand.cs b/src/Cli/Commands/RunCommand.cs index c97f1fb..85c5911 100644 --- a/src/Cli/Commands/RunCommand.cs +++ b/src/Cli/Commands/RunCommand.cs @@ -377,7 +377,11 @@ protected override async Task ExecuteAsync(CommandContext context, RunSetti // Write a seed checkpoint immediately so this session appears in the sessions list // even if the process dies before the first agent turn completes. if (isNewSession) + { await activeStore.SaveAsync(checkpoint, cancellationToken); + _ = eventEmitter?.EmitAsync(EventTypes.CheckpointCreated, + payload: new { session = checkpoint.SessionId }); + } // Set up the context window recorder — appends per-turn snapshots for post-run visualization. var ctxSnapshotsPath = fuseraft.Core.FuseraftPaths.ExpandSessionPaths( @@ -407,6 +411,21 @@ protected override async Task ExecuteAsync(CommandContext context, RunSetti // Stamp the session ID on the event emitter, orchestrator, and compactor so every // component that uses session-scoped paths (e.g. brief.json) resolves them correctly. eventEmitter?.SetSessionId(checkpoint.SessionId); + if (activeStore is JsonSessionStore jsStore && eventEmitter is not null) + jsStore.OnCorruptionDetected = (sid, error) => + eventEmitter.EmitAsync(EventTypes.EventCorruptionDetected, + payload: new { session = sid, source = "session_checkpoint", error }); + if (!isNewSession && eventEmitter is not null) + { + _ = eventEmitter.EmitAsync(EventTypes.SessionRecovered, + payload: new + { + session = checkpoint.SessionId, + turns_prior = checkpoint.Messages.Count, + }); + _ = eventEmitter.EmitAsync(EventTypes.CheckpointLoaded, + payload: new { session = checkpoint.SessionId, turns = checkpoint.Messages.Count }); + } orchestrator.SetSessionId(checkpoint.SessionId); compactor?.SetSessionId(checkpoint.SessionId); @@ -470,8 +489,15 @@ protected override async Task ExecuteAsync(CommandContext context, RunSetti sessionMetrics: sessionMetrics, postmortemWriter: snapshotWriter); + if (!isNewSession && eventEmitter is not null) + _ = eventEmitter.EmitAsync(EventTypes.ResumeStarted, + payload: new { session = checkpoint.SessionId, turns_prior = checkpoint.Messages.Count }); + var result = await runner.RunAsync(task, checkpoint, settings.HumanInTheLoop, settings.ShowTools, cts.Token); + if (!isNewSession && result.Succeeded && eventEmitter is not null) + _ = eventEmitter.EmitAsync(EventTypes.ResumeCompleted, + payload: new { session = checkpoint.SessionId }); devUI?.BroadcastSessionEnd(result.Succeeded, result.ErrorMessage); // Mark complete on success (distinct from per-turn saves above). @@ -486,13 +512,13 @@ protected override async Task ExecuteAsync(CommandContext context, RunSetti { try { - await (eventEmitter?.EmitAsync("skill_curation_start", + await (eventEmitter?.EmitAsync(EventTypes.SkillCurationStart, payload: new { session = checkpoint.SessionId, source = "run" }) ?? Task.CompletedTask); var curationResult = await skillCurator.RunAsync( checkpoint, result.Messages, CancellationToken.None, source: "run"); - await (eventEmitter?.EmitAsync("skill_curation_complete", + await (eventEmitter?.EmitAsync(EventTypes.SkillCurationComplete, payload: new { session = checkpoint.SessionId, @@ -537,8 +563,9 @@ protected override async Task ExecuteAsync(CommandContext context, RunSetti } // Context window visualization — render after the run so all snapshot data is flushed. - var ctxVizPath = fuseraft.Core.FuseraftPaths.ExpandSessionId(fuseraft.Core.FuseraftPaths.LocalCtxViz, checkpoint.SessionId); - if (await fuseraft.Cli.Display.ContextWindowRenderer.RenderAsync(ctxSnapshotsPath, ctxVizPath, checkpoint.SessionId)) + var ctxVizPath = fuseraft.Core.FuseraftPaths.ExpandSessionId(fuseraft.Core.FuseraftPaths.LocalCtxViz, checkpoint.SessionId); + var ctxEventsPath = Path.Combine(Path.GetDirectoryName(ctxSnapshotsPath)!, "events.jsonl"); + if (await fuseraft.Cli.Display.ContextWindowRenderer.RenderAsync(ctxSnapshotsPath, ctxVizPath, checkpoint.SessionId, ctxEventsPath)) AnsiConsole.MarkupLine($"[dim]Context viz → {Markup.Escape(ctxVizPath)}[/]"); // Summary @@ -629,7 +656,7 @@ private static async Task InjectSkillContextAsync( checkpoint.Messages.Add(new AgentMessage { - AgentName = "System", + AgentName = AgentNames.System, Content = sb.ToString().TrimEnd(), Role = "user", TurnIndex = 0, @@ -733,7 +760,7 @@ private static async Task ApplyCompactionAsync( if (orchestrator is not MagenticOrchestrator) { checkpoint.ResumeExecutorId = checkpoint.Messages - .LastOrDefault(m => m.Role == "assistant" && !string.IsNullOrWhiteSpace(m.AgentName)) + .LastOrDefault(m => m.Role == MessageRole.Assistant && !string.IsNullOrWhiteSpace(m.AgentName)) ?.AgentName ?.ToLowerInvariant(); } @@ -779,7 +806,7 @@ private static async Task SaveTranscriptAsync( { await writer.WriteLineAsync("---"); - if (msg.Role == "user") + if (msg.Role == MessageRole.User) { await writer.WriteLineAsync($"## [Human] — Redirect"); } diff --git a/src/Cli/Commands/ValidateConfigCommand.cs b/src/Cli/Commands/ValidateConfigCommand.cs index 33d2641..63464a5 100644 --- a/src/Cli/Commands/ValidateConfigCommand.cs +++ b/src/Cli/Commands/ValidateConfigCommand.cs @@ -7,6 +7,7 @@ using fuseraft.Core.Models; using fuseraft.Infrastructure; using fuseraft.Infrastructure.Plugins; +using fuseraft.Orchestration; namespace fuseraft.Cli.Commands; @@ -123,31 +124,31 @@ protected override async Task ExecuteAsync(CommandContext context, Validate // Selection strategy var selType = config.Selection.Type.ToLowerInvariant(); - if (selType is not ("sequential" or "roundrobin" or "llm" or "keyword" or "structured" or "magentic" or "statemachine" or "graph" or "adversarial")) + if (selType is not (OrchestratorTypes.Sequential or OrchestratorTypes.RoundRobin or OrchestratorTypes.Llm or OrchestratorTypes.Keyword or OrchestratorTypes.Structured or OrchestratorTypes.Magentic or OrchestratorTypes.StateMachine or OrchestratorTypes.Graph or OrchestratorTypes.Adversarial)) issues.Add(("error", $"Unknown selection type: '{config.Selection.Type}'.")); - if (selType == "llm" && config.Selection.Model is null) + if (selType == OrchestratorTypes.Llm && config.Selection.Model is null) issues.Add(("error", "LLM selection requires Selection.Model to be set.")); - if (selType == "keyword" && (config.Selection.Routes is null || config.Selection.Routes.Count == 0)) + if (selType == OrchestratorTypes.Keyword && (config.Selection.Routes is null || config.Selection.Routes.Count == 0)) issues.Add(("error", "Keyword selection requires at least one entry in Routes.")); - if (selType == "structured") + if (selType == OrchestratorTypes.Structured) ValidateStructuredRoutes(config, issues); - if (selType == "magentic") + if (selType == OrchestratorTypes.Magentic) ValidateMagenticSelection(config, issues); - if (selType == "graph") + if (selType == OrchestratorTypes.Graph) ValidateGraph(config, issues); - if (selType == "statemachine") + if (selType == OrchestratorTypes.StateMachine) ValidateStateMachine(config, issues); - if (selType == "adversarial") + if (selType == OrchestratorTypes.Adversarial) ValidateAdversarialSelection(config, issues); - if (selType == "keyword" && config.Selection.Routes is { Count: > 1 }) + if (selType == OrchestratorTypes.Keyword && config.Selection.Routes is { Count: > 1 }) { // Detect routes that share the same keyword and SourceAgents but have different // validators. Because selection uses first-match-wins, the second route's validator diff --git a/src/Cli/CompactionCoordinator.cs b/src/Cli/CompactionCoordinator.cs index bc43673..37691e6 100644 --- a/src/Cli/CompactionCoordinator.cs +++ b/src/Cli/CompactionCoordinator.cs @@ -11,18 +11,6 @@ namespace fuseraft.Cli; -// Compaction trigger classification — informs the session_summary event and the -// compaction event reason field so post-session analysis can identify the primary -// cause of each compaction cycle. -internal static class CompactionReason -{ - public const string SingleTurnLimit = "single_turn_limit"; - public const string CumulativeBudget = "cumulative_budget"; - public const string ShouldCompact = "window_size"; - public const string AgentRequested = "agent_requested"; - public const string ContextExceeded = "context_exceeded"; -} - /// /// Owns the compaction state machine: the pending compaction reason, the post-compaction /// grace flag, and all compaction execution logic. Extracted from SessionRunner so @@ -64,7 +52,7 @@ public async Task EvaluateCompactionTriggerAsync( BudgetEvalResult budgetResult, bool statusActive) { - var agentName = msg.AgentName ?? "Unknown"; + var agentName = msg.AgentName ?? AgentNames.Unknown; // SingleTurnLimit: never suppressed by _justCompacted — a per-turn explosion must // always compact even on the turn immediately after a previous compaction. @@ -78,7 +66,7 @@ public async Task EvaluateCompactionTriggerAsync( $"MaxSingleTurnInputTokens ({budgetResult.SingleTurnThreshold:N0}). " + $"Compacting before next turn...[/]"); if (eventEmitter is not null) - await eventEmitter.EmitAsync("context_budget_cutover", + await eventEmitter.EmitAsync(EventTypes.ContextBudgetCutover, agent: agentName, payload: new { input_tokens = budgetResult.InputTokens, cutover_at = budgetResult.SingleTurnThreshold, reason = CompactionReason.SingleTurnLimit }); return true; @@ -109,9 +97,10 @@ await eventEmitter.EmitAsync("context_budget_cutover", _pendingCompactionReason = CompactionReason.CumulativeBudget; AnsiConsole.MarkupLine( $"[yellow] ⚡ {Markup.Escape(agentName)} reached context budget cutover " + - $"({budgetResult.CumulativeInputTokens:N0} ≥ {budgetResult.CutoverThreshold:N0} input tokens). Compacting history...[/]"); + $"({budgetResult.CumulativeInputTokens:N0} ≥ {budgetResult.CutoverThreshold:N0} tokens).[/]"); + AnsiConsole.MarkupLine($"[yellow] Compacting history...[/]"); if (eventEmitter is not null) - await eventEmitter.EmitAsync("context_budget_cutover", + await eventEmitter.EmitAsync(EventTypes.ContextBudgetCutover, agent: agentName, payload: new { cumulative_input_tokens = budgetResult.CumulativeInputTokens, cutover_at = budgetResult.CutoverThreshold }); return true; @@ -193,7 +182,7 @@ private async Task ApplyCompactionAsync( if (orchestrator is not MagenticOrchestrator) { lastAssistantAgent = checkpoint.Messages - .LastOrDefault(m => m.Role == "assistant" && !string.IsNullOrWhiteSpace(m.AgentName)) + .LastOrDefault(m => m.Role == MessageRole.Assistant && !string.IsNullOrWhiteSpace(m.AgentName)) ?.AgentName ?.ToLowerInvariant(); @@ -223,7 +212,7 @@ private async Task ApplyCompactionAsync( } if (orchestrator is not MagenticOrchestrator && eventEmitter is not null) - _ = eventEmitter.EmitAsync("compaction_resume_candidate", + _ = eventEmitter.EmitAsync(EventTypes.CompactionResumeCandidate, payload: new { last_assistant_agent = lastAssistantAgent, @@ -253,10 +242,10 @@ private async Task ApplyCompactionAsync( sessionMetrics?.RecordCompaction(_pendingCompactionReason); if (eventEmitter is not null) - await eventEmitter.EmitAsync("compaction", + await eventEmitter.EmitAsync(EventTypes.Compaction, payload: new { - mode = "window", + mode = CompactionModes.Window, reason = _pendingCompactionReason, turns_dropped = dropped, turns_retained = trimmed.Count, @@ -289,7 +278,7 @@ await eventEmitter.EmitAsync("compaction", sessionMetrics?.RecordCompaction(_pendingCompactionReason); if (eventEmitter is not null) - await eventEmitter.EmitAsync("compaction", + await eventEmitter.EmitAsync(EventTypes.Compaction, payload: new { turns_compacted = turnsBefore - retained.Count, @@ -313,7 +302,7 @@ private static void TryPinLastRoutingSignal( for (int i = original.Count - 1; i >= 0; i--) { var m = original[i]; - if (m.Role == "assistant" && + if (m.Role == MessageRole.Assistant && m.ToolCalls?.Any(tc => string.Equals(tc.Name, HandoffPlugin.FunctionName, StringComparison.OrdinalIgnoreCase)) == true) { lastHandoff = m; @@ -334,7 +323,7 @@ private static void TryPinLastRoutingSignal( if (string.IsNullOrEmpty(routeKeyword)) return; bool alreadyPresent = retained.Any(m => - m.Role == "assistant" && + m.Role == MessageRole.Assistant && m.ToolCalls?.Any(tc => string.Equals(tc.Name, HandoffPlugin.FunctionName, StringComparison.OrdinalIgnoreCase) && tc.ArgsSummary?.EndsWith(routeKeyword, StringComparison.OrdinalIgnoreCase) == true) == true); diff --git a/src/Cli/CompactionReason.cs b/src/Cli/CompactionReason.cs new file mode 100644 index 0000000..2d22420 --- /dev/null +++ b/src/Cli/CompactionReason.cs @@ -0,0 +1,13 @@ +namespace fuseraft.Cli; + +// Compaction trigger classification — informs the session_summary event and the +// compaction event reason field so post-session analysis can identify the primary +// cause of each compaction cycle. +internal static class CompactionReason +{ + public const string SingleTurnLimit = "single_turn_limit"; + public const string CumulativeBudget = "cumulative_budget"; + public const string ShouldCompact = "window_size"; + public const string AgentRequested = "agent_requested"; + public const string ContextExceeded = "context_exceeded"; +} diff --git a/src/Cli/ContextBudgetManager.cs b/src/Cli/ContextBudgetManager.cs index 13a4840..9d2be56 100644 --- a/src/Cli/ContextBudgetManager.cs +++ b/src/Cli/ContextBudgetManager.cs @@ -40,7 +40,7 @@ public void Reset() /// public async Task EvaluateAsync(AgentMessage msg, bool statusActive) { - var agentName = msg.AgentName ?? "Unknown"; + var agentName = msg.AgentName ?? AgentNames.Unknown; int inputToks = 0; int cumulative = 0; @@ -65,12 +65,16 @@ await contextWindowRecorder.RecordAsync( && _warnedAgents.Add(agentName)) { if (statusActive) AnsiConsole.WriteLine(); + // Split into two short lines so neither wraps in an 80-col terminal. + // A single long line wrapping inside a Spectre Status context causes the + // spinner's \r\x1b[2K to clobber the second visual line of the message. AnsiConsole.MarkupLine( - $"[yellow] ⚠ {Markup.Escape(agentName)} has accumulated {cumulative:N0} cumulative " + - $"input tokens (warn_at: {contextBudget.WarnAt:N0}). " + - $"Context rot risk — compaction will trigger at {contextBudget.CutoverAt:N0} tokens.[/]"); + $"[yellow] ⚠ {Markup.Escape(agentName)} accumulated {cumulative:N0} input tokens " + + $"(warn_at: {contextBudget.WarnAt:N0}).[/]"); + AnsiConsole.MarkupLine( + $"[yellow] Context rot risk — compaction will trigger at {contextBudget.CutoverAt:N0} tokens.[/]"); if (eventEmitter is not null) - await eventEmitter.EmitAsync("context_budget_warn", + await eventEmitter.EmitAsync(EventTypes.ContextBudgetWarn, agent: agentName, payload: new { cumulative_input_tokens = cumulative, warn_at = contextBudget.WarnAt, cutover_at = contextBudget.CutoverAt }); } diff --git a/src/Cli/DevUI/DevUIServer.cs b/src/Cli/DevUI/DevUIServer.cs index 6f1e035..a0b60e1 100644 --- a/src/Cli/DevUI/DevUIServer.cs +++ b/src/Cli/DevUI/DevUIServer.cs @@ -6,6 +6,7 @@ using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using fuseraft.Core.Models; +using fuseraft.Orchestration; namespace fuseraft.Cli.DevUI; @@ -107,7 +108,7 @@ public async ValueTask DisposeAsync() // ------------------------------------------------------------------------- public void BroadcastSessionStart(string sessionId, string task, string configName) - => Emit("session_start", new { sessionId, task, configName, ts = Ts() }); + => Emit(EventTypes.SessionStart, new { sessionId, task, configName, ts = Ts() }); public void BroadcastAgentStarting(string agentName) => Emit("agent_starting", new { agentName, ts = Ts() }); diff --git a/src/Cli/Diagram/WorkflowDiagramGenerator.cs b/src/Cli/Diagram/WorkflowDiagramGenerator.cs index 479e8eb..61d2e56 100644 --- a/src/Cli/Diagram/WorkflowDiagramGenerator.cs +++ b/src/Cli/Diagram/WorkflowDiagramGenerator.cs @@ -1,5 +1,6 @@ using System.Text; using fuseraft.Core.Models; +using fuseraft.Orchestration; namespace fuseraft.Cli.Diagram; @@ -29,25 +30,25 @@ public static string ToMermaid(OrchestrationConfig config) switch (config.Selection.Type.ToLowerInvariant()) { - case "keyword" when config.Selection.Routes is { Count: > 0 }: + case OrchestratorTypes.Keyword when config.Selection.Routes is { Count: > 0 }: RenderKeyword(sb, config); break; - case "structured" when config.Selection.StructuredRoutes is { Count: > 0 }: + case OrchestratorTypes.Structured when config.Selection.StructuredRoutes is { Count: > 0 }: RenderStructured(sb, config); break; - case "sequential": + case OrchestratorTypes.Sequential: RenderSequential(sb, config); break; - case "magentic": + case OrchestratorTypes.Magentic: RenderMagentic(sb, config); break; - case "graph" when config.Selection.Graph is not null: + case OrchestratorTypes.Graph when config.Selection.Graph is not null: RenderGraph(sb, config.Selection.Graph); break; - case "statemachine" when config.Selection.StateMachine is not null: + case OrchestratorTypes.StateMachine when config.Selection.StateMachine is not null: RenderStateMachine(sb, config.Selection.StateMachine); break; - case "adversarial" when config.Selection.Adversarial is not null: + case OrchestratorTypes.Adversarial when config.Selection.Adversarial is not null: RenderAdversarial(sb, config.Selection.Adversarial); break; default: diff --git a/src/Cli/Display/ContextWindowRenderer.cs b/src/Cli/Display/ContextWindowRenderer.cs index 0b34379..c2145e5 100644 --- a/src/Cli/Display/ContextWindowRenderer.cs +++ b/src/Cli/Display/ContextWindowRenderer.cs @@ -1,12 +1,14 @@ using System.Text; using System.Text.Json; +using fuseraft.Orchestration; namespace fuseraft.Cli.Display; /// -/// Reads a context-window snapshot JSONL file produced by -/// and writes a self-contained -/// Chart.js HTML file showing cumulative input token growth per agent over time. +/// Reads context-window snapshot and event JSONL files and writes a self-contained +/// Chart.js HTML file with a per-turn token bar chart (top) and a cumulative input +/// token line chart (bottom). Event annotations (validation_fail, tool_blocked) are +/// overlaid on both charts when an events file is present. /// public static class ContextWindowRenderer { @@ -16,23 +18,29 @@ public static class ContextWindowRenderer PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, }; + private static readonly HashSet UsefulEventTypes = new(StringComparer.OrdinalIgnoreCase) + { + EventTypes.TurnEnd, EventTypes.ValidationFail, EventTypes.ToolBlocked, EventTypes.ContextAssembly, + }; + /// - /// Reads , filters to , - /// and writes a Chart.js HTML visualization to . - /// Returns true if the file was written, false when there are no snapshots. - /// Never throws. + /// Reads (and optionally ), + /// filters to , and writes a Chart.js HTML visualization + /// to . Returns true if the file was written. /// public static async Task RenderAsync( - string snapshotsPath, - string outputPath, - string sessionId) + string snapshotsPath, + string outputPath, + string sessionId, + string? eventsPath = null) { try { var snapshots = await LoadSnapshotsAsync(snapshotsPath, sessionId); if (snapshots.Count == 0) return false; - var html = BuildHtml(snapshots, sessionId); + var events = await LoadEventsAsync(eventsPath, sessionId); + var html = BuildHtml(snapshots, events, sessionId); var dir = Path.GetDirectoryName(outputPath); if (!string.IsNullOrEmpty(dir)) Directory.CreateDirectory(dir); @@ -62,16 +70,44 @@ private static async Task> LoadSnapshotsAsync(string path, string return result; } - private static string BuildHtml(List snapshots, string sessionId) + private static async Task> LoadEventsAsync(string? path, string sessionId) + { + if (string.IsNullOrEmpty(path) || !File.Exists(path)) return []; + + var result = new List(); + foreach (var line in await File.ReadAllLinesAsync(path)) + { + if (string.IsNullOrWhiteSpace(line)) continue; + try + { + var e = JsonSerializer.Deserialize(line, JsonOpts); + if (e is not null + && string.Equals(e.Session, sessionId, StringComparison.OrdinalIgnoreCase) + && e.EventType is { } et + && UsefulEventTypes.Contains(et)) + result.Add(e); + } + catch { /* skip malformed lines */ } + } + return result; + } + + private static string BuildHtml(List snapshots, List events, string sessionId) { - // Snapshot data embedded as JSON (safe: values are numbers, bools, and ISO strings) var snapshotsJson = JsonSerializer.Serialize(snapshots, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, WriteIndented = false, }); - // Extract threshold values from the first snapshot that carries them + var eventsJson = events.Count > 0 + ? JsonSerializer.Serialize(events, new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, + WriteIndented = false, + }) + : "[]"; + var warnAt = snapshots.FirstOrDefault(s => s.WarnAt is > 0)?.WarnAt ?? 0; var cutoverAt = snapshots.FirstOrDefault(s => s.CutoverAt is > 0)?.CutoverAt ?? 0; @@ -93,53 +129,252 @@ private static string BuildHtml(List snapshots, string sessionId) padding: 24px; min-height: 100vh; } - header { margin-bottom: 16px; } + header { margin-bottom: 20px; } header h1 { font-size: 15px; font-weight: 600; color: #e6edf3; } header p { font-size: 12px; color: #8b949e; margin-top: 4px; } + .section { margin-bottom: 20px; } + .section-header { + display: flex; + align-items: center; + gap: 12px; + margin-bottom: 10px; + } + .section-title { + font-size: 12px; + font-weight: 600; + color: #8b949e; + text-transform: uppercase; + letter-spacing: 0.05em; + } + .toggles { display: flex; gap: 6px; } + .toggle { + font-family: inherit; + font-size: 11px; + padding: 3px 10px; + border-radius: 4px; + border: 1px solid #30363d; + background: #161b22; + color: #8b949e; + cursor: pointer; + transition: background 0.15s, color 0.15s, border-color 0.15s; + } + .toggle.active { background: #21262d; color: #e6edf3; border-color: #484f58; } + .toggle:hover { background: #21262d; color: #e6edf3; } .chart-wrap { background: #161b22; border: 1px solid #21262d; border-radius: 6px; padding: 20px; - height: 520px; position: relative; } + .chart-wrap.bar-chart { height: 340px; } + .chart-wrap.line-chart { height: 520px; } footer { margin-top: 12px; font-size: 11px; color: #484f58; }

Context Window Visualization

-

Session {{sessionId}} — cumulative input tokens per agent over turns

+

Session {{sessionId}} — per-turn tokens and cumulative input token growth per agent

-
- + +
+
+ Per-Turn Tokens +
+ + +
+
+
+ +
+ +
+
+ Cumulative Input Tokens +
+
+ +
+
+
Generated by fuseraft-cli — compaction events shown as vertical markers. Requires internet for Chart.js CDN.
+