From ab6beefd81675cdb7492701621b2a19b68b8dab2 Mon Sep 17 00:00:00 2001 From: Nicola Franco Date: Sat, 23 May 2026 21:27:39 +0200 Subject: [PATCH] feat(tui): structured event bus for live attack monitoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TUI previously regex-parsed stringified log lines to populate its Actions tab, which meant the panel was effectively dead (the matcher was never instantiated outside tests) and the per-attack progress bar was a 2-second time-based fake. The Tracker/StepTracker already produced structured events for the same data — they just never reached the TUI live. This change wires a thread-safe `TUIEventBus` (subscribe/unsubscribe/ emit) end-to-end from `HackAgent.hack(...)` through `orchestrator .execute(...)` and `TrackingCoordinator` into `Tracker` and `StepTracker`, replaces the orphan `_tui_app`/`_tui_log_callback` parameters with `_tui_event_bus`, and has the Actions viewer subscribe to `goal_started` / `goal_finalized` / `step_started` / `trace_added` events. The progress bar now advances against real `expected_total_ goals`. The Logs viewer gains a structured `_records` buffer with per-level filters, case-insensitive search, and save-to-file. Dead code (`actions_logger.py`, `views/dashboard.py`, `attach_tui_handler`/ `detach_tui_handler`) is removed; `HackAgentTUI.show_*` notification helpers and `action_refresh` now actually work. Co-Authored-By: Claude Opus 4.7 --- hackagent/agent.py | 6 +- hackagent/attacks/orchestrator.py | 55 ++- hackagent/attacks/techniques/base.py | 1 + .../attacks/techniques/baseline/generation.py | 1 + hackagent/cli/tui/actions_logger.py | 189 ---------- hackagent/cli/tui/app.py | 33 +- hackagent/cli/tui/events.py | 150 ++++++++ hackagent/cli/tui/logger.py | 59 +-- hackagent/cli/tui/views/attacks.py | 101 ++++- hackagent/cli/tui/views/dashboard.py | 352 ------------------ hackagent/cli/tui/widgets/actions.py | 129 +++++++ hackagent/cli/tui/widgets/logs.py | 284 +++++++++----- hackagent/router/tracking/context.py | 3 + hackagent/router/tracking/coordinator.py | 3 + hackagent/router/tracking/step.py | 68 +++- hackagent/router/tracking/tracker.py | 64 ++++ tests/unit/cli/tui/test_events.py | 193 ++++++++++ .../unit/cli/tui/test_tui_results_and_logs.py | 43 +-- .../tui/test_widgets_and_bus_integration.py | 347 +++++++++++++++++ 19 files changed, 1325 insertions(+), 756 deletions(-) delete mode 100644 hackagent/cli/tui/actions_logger.py create mode 100644 hackagent/cli/tui/events.py delete mode 100644 hackagent/cli/tui/views/dashboard.py create mode 100644 tests/unit/cli/tui/test_events.py create mode 100644 tests/unit/cli/tui/test_widgets_and_bus_integration.py diff --git a/hackagent/agent.py b/hackagent/agent.py index 82644c71..95ba60de 100644 --- a/hackagent/agent.py +++ b/hackagent/agent.py @@ -215,8 +215,7 @@ def hack( attack_config: Dict[str, Any], run_config_override: Optional[Dict[str, Any]] = None, fail_on_run_error: bool = True, - _tui_app: Optional[Any] = None, - _tui_log_callback: Optional[Any] = None, + _tui_event_bus: Optional[Any] = None, ) -> Any: """ Executes a specified attack strategy against the configured victim agent. @@ -273,8 +272,7 @@ def hack( attack_config=attack_config, run_config_override=run_config_override, fail_on_run_error=fail_on_run_error, - _tui_app=_tui_app, - _tui_log_callback=_tui_log_callback, + _tui_event_bus=_tui_event_bus, ) except HackAgentError: diff --git a/hackagent/attacks/orchestrator.py b/hackagent/attacks/orchestrator.py index 2173f149..c275cd6a 100644 --- a/hackagent/attacks/orchestrator.py +++ b/hackagent/attacks/orchestrator.py @@ -633,8 +633,7 @@ def execute( fail_on_run_error: bool, max_wait_time_seconds: Optional[int] = None, poll_interval_seconds: Optional[int] = None, - _tui_app: Optional[Any] = None, - _tui_log_callback: Optional[Any] = None, + _tui_event_bus: Optional[Any] = None, ) -> Any: """ Execute attack with server tracking. @@ -652,8 +651,9 @@ def execute( fail_on_run_error: Whether to raise on errors max_wait_time_seconds: Unused for local execution poll_interval_seconds: Unused for local execution - _tui_app: Optional TUI app for logging - _tui_log_callback: Optional TUI log callback + _tui_event_bus: Optional :class:`hackagent.cli.tui.events.TUIEventBus` + that receives structured events (step start/end, tool calls, + progress, etc.) during execution. Returns: Attack results from local execution @@ -710,6 +710,22 @@ def execute( except Exception as e: logger.warning(f"Failed to update run status to RUNNING: {e}") + # Make the event bus available to the technique impl and to the + # tracker via the shared config bag (alongside _run_id / _backend). + if _tui_event_bus is not None: + attack_config = {**attack_config, "_tui_event_bus": _tui_event_bus} + effective_run_config = { + **effective_run_config, + "_tui_event_bus": _tui_event_bus, + } + _tui_event_bus.emit( + "step_started", + step_name="Attack Execution", + attack_type=self.attack_type, + run_id=run_id, + expected_total_goals=effective_run_config.get("expected_total_goals"), + ) + # 5. Execute locally try: _total_t0 = time.perf_counter() @@ -734,6 +750,9 @@ def execute( "_backend": self.hackagent_agent.backend, } + if _tui_event_bus is not None: + _tui_event_bus.emit("step_started", step_name="Evaluation Pipeline") + if (self.attack_type or "").lower() == "pair": from hackagent.attacks.techniques.pair.evaluation import ( PAIREvaluation, @@ -778,10 +797,31 @@ def execute( except Exception as e: logger.warning(f"Evaluation failed: {e}", exc_info=True) final_results = results # fallback + if _tui_event_bus is not None: + _tui_event_bus.emit( + "step_ended", + step_name="Evaluation Pipeline", + success=False, + error=str(e), + ) + else: + if _tui_event_bus is not None: + _tui_event_bus.emit( + "step_ended", + step_name="Evaluation Pipeline", + success=True, + ) # ⏱ timing AFTER evaluation _total_elapsed = round(time.perf_counter() - _total_t0, 3) logger.info(f"Total run time: {_total_elapsed:.1f}s") + if _tui_event_bus is not None: + _tui_event_bus.emit( + "step_ended", + step_name="Attack Execution", + success=True, + elapsed_s=_total_elapsed, + ) # ✅ Update run status to COMPLETED try: @@ -806,6 +846,13 @@ def execute( ) except Exception as update_error: logger.warning(f"Failed to update run status to FAILED: {update_error}") + if _tui_event_bus is not None: + _tui_event_bus.emit( + "step_ended", + step_name="Attack Execution", + success=False, + error=str(e), + ) raise # ======================================================================== diff --git a/hackagent/attacks/techniques/base.py b/hackagent/attacks/techniques/base.py index c76164e5..7cf76010 100644 --- a/hackagent/attacks/techniques/base.py +++ b/hackagent/attacks/techniques/base.py @@ -247,6 +247,7 @@ def _initialize_coordinator( initial_metadata=initial_metadata, goal_index_start=goal_index_start, run_start_time=run_start_time, + event_bus=self.config.get("_tui_event_bus"), ) # Backward-compat: expose step_tracker as self.tracker diff --git a/hackagent/attacks/techniques/baseline/generation.py b/hackagent/attacks/techniques/baseline/generation.py index 610b1997..a88457b7 100644 --- a/hackagent/attacks/techniques/baseline/generation.py +++ b/hackagent/attacks/techniques/baseline/generation.py @@ -174,6 +174,7 @@ def execute_prompts( logger=logger, attack_type="baseline", category_classifier_config=config.get("category_classifier"), + event_bus=config.get("_tui_event_bus"), ) else: logger.warning("⚠️ Missing tracking context - results will NOT be created!") diff --git a/hackagent/cli/tui/actions_logger.py b/hackagent/cli/tui/actions_logger.py deleted file mode 100644 index 2c8e12ab..00000000 --- a/hackagent/cli/tui/actions_logger.py +++ /dev/null @@ -1,189 +0,0 @@ -# Copyright 2026 - AI4I. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -""" -TUI Actions Logger - -Custom logging handler that extracts and displays agent actions (tool calls, HTTP requests) -in the TUI actions viewer. -""" - -import logging -import re -from typing import Any, Callable - - -class TUIActionsHandler(logging.Handler): - """ - Custom logging handler that extracts agent actions from log messages - and displays them in the TUI actions viewer. - - This handler parses log messages to identify: - - HTTP requests to agent endpoints - - Tool/function calls with arguments - - ADK agent events (tool_call, tool_result, llm_response) - - API responses - """ - - def __init__( - self, - actions_callback: Callable, - app_callback: Callable, - level: int = logging.INFO, - ): - """ - Initialize the actions handler. - - Args: - actions_callback: Callback function to add actions to the viewer - Signature: (action_type: str, **kwargs) - app_callback: Callback to call actions from thread-safe context - level: Logging level - """ - super().__init__(level) - self.actions_callback = actions_callback - self.app_callback = app_callback - - def emit(self, record: logging.LogRecord) -> None: - """ - Process a log record and extract action information. - - Args: - record: The log record to process - """ - try: - message = record.getMessage() - - # Pattern 1: HTTP requests to agent endpoints - # Example: "🌐 Sending request to agent endpoint: http://localhost:8000/run" - if "Sending request to agent endpoint:" in message or "🌐" in message: - url_match = re.search(r"(https?://[^\s]+)", message) - if url_match: - url = url_match.group(1) - method = "POST" # Most agent requests are POST - self.app_callback( - self.actions_callback, - "http_request", - method=method, - url=url, - ) - - # Pattern 2: Tool calls (from _log_agent_actions in completions.py) - # Example: "🔧 Agent actions for prefix #1:" - elif "🔧 Agent actions" in message or "Tool:" in message: - # Extract tool name if present - tool_match = re.search(r"Tool:\s*(\w+)", message) - if tool_match: - tool_name = tool_match.group(1) - self.app_callback( - self.actions_callback, - "tool_call", - tool_name=tool_name, - ) - - # Pattern 3: ADK events - # Example: "🤖 ADK Agent actions for prefix #1:" - elif "🤖 ADK Agent actions" in message or "ADK" in message: - # Check subsequent messages for event details - if "Tool Call:" in message: - tool_match = re.search(r"Tool Call:\s*(\w+)", message) - if tool_match: - tool_name = tool_match.group(1) - self.app_callback( - self.actions_callback, - "adk_event", - event_type="tool_call", - tool_name=tool_name, - ) - elif "Tool Result:" in message: - tool_match = re.search(r"Tool Result:\s*(\w+)", message) - if tool_match: - tool_name = tool_match.group(1) - self.app_callback( - self.actions_callback, - "adk_event", - event_type="tool_result", - tool_name=tool_name, - ) - elif "LLM Response:" in message: - content_match = re.search(r"LLM Response:\s*(.+)", message) - content = content_match.group(1) if content_match else "" - self.app_callback( - self.actions_callback, - "adk_event", - event_type="llm_response", - content=content, - ) - - # Pattern 4: Agent responses - # Example: "✅ Agent responded with status 200" - elif "Agent responded" in message or "✅" in message: - status_match = re.search(r"status\s+(\d+)", message) - if status_match: - # This indicates a successful HTTP response - pass # Could add response visualization here - - # Pattern 5: Model queries (LiteLLM) - # Example: "🌐 Querying model gpt-4" - elif "Querying model" in message: - model_match = re.search(r"model\s+(\S+)", message) - if model_match: - model_name = model_match.group(1) - self.app_callback( - self.actions_callback, - "llm_query", - model_name=model_name, - ) - - except Exception: - # Silently fail to avoid breaking the logging system - pass - - -def extract_action_data_from_log( - actions_viewer: Any, - action_type: str, - **kwargs: Any, -) -> None: - """ - Extract and display action data in the actions viewer. - - Args: - actions_viewer: The AgentActionsViewer widget - action_type: Type of action (http_request, tool_call, adk_event, etc.) - **kwargs: Action-specific parameters - """ - try: - if action_type == "http_request": - actions_viewer.add_http_request( - method=kwargs.get("method", "POST"), - url=kwargs.get("url", ""), - headers=kwargs.get("headers"), - payload=kwargs.get("payload"), - ) - - elif action_type == "tool_call": - actions_viewer.add_tool_call( - tool_name=kwargs.get("tool_name", "unknown"), - arguments=kwargs.get("arguments"), - result=kwargs.get("result"), - ) - - elif action_type == "adk_event": - event_type = kwargs.get("event_type", "unknown") - event_data = { - "tool_name": kwargs.get("tool_name", ""), - "tool_input": kwargs.get("tool_input", {}), - "result": kwargs.get("result", ""), - "content": kwargs.get("content", ""), - } - actions_viewer.add_adk_event(event_type, event_data) - - elif action_type == "llm_query": - # Display LLM query as a special kind of action - model_name = kwargs.get("model_name", "unknown") - actions_viewer.add_step_separator(f"LLM Query: {model_name}") - - except Exception: - # Silently fail to avoid breaking the UI - pass diff --git a/hackagent/cli/tui/app.py b/hackagent/cli/tui/app.py index 7b11fcb0..a2143bea 100644 --- a/hackagent/cli/tui/app.py +++ b/hackagent/cli/tui/app.py @@ -214,15 +214,26 @@ def action_switch_tab(self, tab_id: str) -> None: tabs.active = tab_id def action_refresh(self) -> None: - """Refresh the current tab's data.""" + """Refresh the current tab's data. + + Walks every descendant of the active TabPane (not just immediate + children) so nested tab widgets — including those that wrap their + body in a scroller — still receive the refresh. + """ tabs = self.query_one(TabbedContent) active_pane = tabs.get_pane(tabs.active) - if active_pane and hasattr(active_pane, "refresh_data"): - # Get the first child of the TabPane (our custom tab widget) - for child in active_pane.children: - if hasattr(child, "refresh_data"): - child.refresh_data() - break + if active_pane is None: + return + try: + for descendant in active_pane.query("*"): + if hasattr(descendant, "refresh_data"): + descendant.refresh_data() + return + except Exception: + pass + # Last-resort: try the pane itself. + if hasattr(active_pane, "refresh_data"): + active_pane.refresh_data() def on_mount(self) -> None: """Called when the app is mounted.""" @@ -231,16 +242,16 @@ def on_mount(self) -> None: def show_success(self, message: str) -> None: """Show success notification with checkmark.""" - pass + self.notify(f"✓ {message}", title="Success", severity="information") def show_error(self, message: str) -> None: """Show error notification with X mark.""" - pass + self.notify(f"✗ {message}", title="Error", severity="error") def show_warning(self, message: str) -> None: """Show warning notification with warning sign.""" - pass + self.notify(f"⚠ {message}", title="Warning", severity="warning") def show_info(self, message: str) -> None: """Show info notification with info icon.""" - pass + self.notify(f"ℹ {message}", title="Info", severity="information") diff --git a/hackagent/cli/tui/events.py b/hackagent/cli/tui/events.py new file mode 100644 index 00000000..c7d033bc --- /dev/null +++ b/hackagent/cli/tui/events.py @@ -0,0 +1,150 @@ +# Copyright 2026 - AI4I. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Structured event bus for the TUI. + +Replaces ad-hoc log-string parsing. Attack techniques, trackers, and +adapters emit typed events; TUI widgets subscribe and render them directly. + +Events are delivered synchronously on the emitting thread. Subscribers that +need to update Textual widgets should wrap their callback in +``app.call_from_thread`` themselves; the bus does not assume a Textual app. +""" + +import logging +import threading +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + + +# Event type constants. Using plain strings (not an Enum) keeps the bus +# friendly to ad-hoc emitters and to JSON serialization. +EVENT_STEP_STARTED = "step_started" +EVENT_STEP_ENDED = "step_ended" +EVENT_GOAL_STARTED = "goal_started" +EVENT_GOAL_FINALIZED = "goal_finalized" +EVENT_TOOL_CALL = "tool_call" +EVENT_TOOL_RESULT = "tool_result" +EVENT_LLM_REQUEST = "llm_request" +EVENT_LLM_RESPONSE = "llm_response" +EVENT_HTTP_REQUEST = "http_request" +EVENT_EVALUATION = "evaluation" +EVENT_PROGRESS = "progress" + +ALL_EVENT_TYPES = ( + EVENT_STEP_STARTED, + EVENT_STEP_ENDED, + EVENT_GOAL_STARTED, + EVENT_GOAL_FINALIZED, + EVENT_TOOL_CALL, + EVENT_TOOL_RESULT, + EVENT_LLM_REQUEST, + EVENT_LLM_RESPONSE, + EVENT_HTTP_REQUEST, + EVENT_EVALUATION, + EVENT_PROGRESS, +) + + +@dataclass(frozen=True) +class TUIEvent: + """A single bus event. + + ``event_type`` is one of the ``EVENT_*`` constants. ``payload`` is the + structured data — its shape depends on the event type. ``timestamp`` is + wall-clock seconds since the epoch, set automatically. + """ + + event_type: str + payload: Dict[str, Any] = field(default_factory=dict) + timestamp: float = 0.0 + + +Subscriber = Callable[[TUIEvent], None] + + +class TUIEventBus: + """Thread-safe pub/sub event bus. + + Subscribers register per event type (or for all events). Emitters call + :meth:`emit` from any thread; subscriber callbacks run synchronously on + the emitting thread. Exceptions raised by subscribers are logged and + swallowed so a misbehaving subscriber cannot break the attack. + """ + + _ALL = "__all__" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._subscribers: Dict[str, List[Subscriber]] = {} + self._logger = logging.getLogger("hackagent.cli.tui.events") + + def subscribe( + self, + callback: Subscriber, + event_type: Optional[str] = None, + ) -> None: + """Register a subscriber. + + Args: + callback: Function called with each matching :class:`TUIEvent`. + event_type: Specific event type to subscribe to. If ``None``, + the subscriber receives every event. + """ + key = event_type or self._ALL + with self._lock: + self._subscribers.setdefault(key, []).append(callback) + + def unsubscribe( + self, + callback: Subscriber, + event_type: Optional[str] = None, + ) -> None: + """Remove a previously registered subscriber. No-op if not found.""" + key = event_type or self._ALL + with self._lock: + subs = self._subscribers.get(key) + if not subs: + return + try: + subs.remove(callback) + except ValueError: + pass + + def emit(self, event_type: str, **payload: Any) -> None: + """Emit an event to all matching subscribers. + + Args: + event_type: One of the ``EVENT_*`` constants. Custom types are + allowed but will only be seen by ``subscribe(..., None)`` + catch-all subscribers and by subscribers using the exact + same string. + **payload: Structured event data; merged into ``TUIEvent.payload``. + """ + import time + + event = TUIEvent( + event_type=event_type, + payload=dict(payload), + timestamp=time.time(), + ) + + # Snapshot the subscriber list under lock, then dispatch outside + # the lock so a slow subscriber cannot block other emitters. + with self._lock: + specific = list(self._subscribers.get(event_type, ())) + catch_all = list(self._subscribers.get(self._ALL, ())) + + for cb in specific + catch_all: + try: + cb(event) + except Exception: + self._logger.debug( + "TUI event subscriber raised; ignoring", exc_info=True + ) + + def clear(self) -> None: + """Remove every subscriber. Useful between attack runs.""" + with self._lock: + self._subscribers.clear() diff --git a/hackagent/cli/tui/logger.py b/hackagent/cli/tui/logger.py index e23e17d5..00c16dc5 100644 --- a/hackagent/cli/tui/logger.py +++ b/hackagent/cli/tui/logger.py @@ -254,54 +254,11 @@ def wrapper(self, *args: Any, **kwargs: Any) -> Any: return decorator -def attach_tui_handler( - attack_instance: Any, - app: App, - callback: Callable[[str, str], None], - max_buffer_size: int = 1000, - level: int = logging.INFO, -) -> TUILogHandler: - """ - Attach a TUI log handler to an attack instance. - - This function should be called before executing an attack to set up - the logging infrastructure for TUI display. - - Args: - attack_instance: The attack object to attach the handler to - app: Textual App instance for thread-safe calls - callback: Function to call with (message, level) for each log - max_buffer_size: Maximum number of logs to buffer - level: Minimum log level to capture - - Returns: - The created TUILogHandler instance - """ - handler = TUILogHandler( - app=app, - callback=callback, - max_buffer_size=max_buffer_size, - level=level, - ) - - # Store the handler on the attack instance - attack_instance._tui_log_handler = handler - - return handler - - -def detach_tui_handler(attack_instance: Any) -> Optional[TUILogHandler]: - """ - Detach and return the TUI log handler from an attack instance. - - Args: - attack_instance: The attack object to detach the handler from - - Returns: - The detached TUILogHandler instance, or None if not present - """ - if hasattr(attack_instance, "_tui_log_handler"): - handler = attack_instance._tui_log_handler - delattr(attack_instance, "_tui_log_handler") - return handler - return None +# NOTE: ``attach_tui_handler`` / ``detach_tui_handler`` were removed in the +# TUI cleanup pass — they were never called by production code. The current +# TUI worker (:mod:`hackagent.cli.tui.views.attacks`) installs a +# :class:`TUILogHandler` directly on the ``hackagent`` logger; for attack +# techniques whose logger sets ``propagate=False``, the +# :func:`with_tui_logging` decorator above is the supported integration +# point — set ``attack_instance._tui_log_handler`` before calling +# ``attack_instance.run(...)`` to enable it. diff --git a/hackagent/cli/tui/views/attacks.py b/hackagent/cli/tui/views/attacks.py index 13ad866a..aa30381e 100644 --- a/hackagent/cli/tui/views/attacks.py +++ b/hackagent/cli/tui/views/attacks.py @@ -1064,6 +1064,14 @@ def _filtered_log_callback(message: str, level: str) -> None: hackagent_logger.addHandler(tui_log_handler) hackagent_logger.setLevel(tui_log_level) + # Build the structured event bus and hook the actions viewer. + # The bus is also passed to ``agent.hack(...)`` below so trackers + # emit goal/step/trace events as the attack runs. + from hackagent.cli.tui.events import TUIEventBus + + tui_event_bus = TUIEventBus() + actions_viewer.subscribe_to_bus(tui_event_bus, self.app) + logging.getLogger("httpx").setLevel(logging.CRITICAL) logging.getLogger("litellm").setLevel(logging.CRITICAL) @@ -1118,36 +1126,91 @@ def _filtered_log_callback(message: str, level: str) -> None: start_time = time.time() - def log_callback(message: str, level: str) -> None: - _filtered_log_callback(message, level) - - import threading + # Event-driven progress: each `goal_finalized` advances the bar + # toward 95% based on the expected goal count carried by the + # orchestrator's `step_started` event. Anything beyond execution + # (sync to backend) takes the final 5%. + progress_state = {"goals_done": 0, "expected": 0} + + def _on_bus_event(event: Any) -> None: + et = event.event_type + payload = event.payload or {} + + if ( + et == "step_started" + and payload.get("step_name") == "Attack Execution" + ): + expected = payload.get("expected_total_goals") or 0 + progress_state["expected"] = int(expected) if expected else 0 + self.app.call_from_thread(progress_bar.update, progress=45) + self.app.call_from_thread( + status_widget.update, + f"""[bold cyan]⚔️ Executing {_escape(strategy_name)} Attack...[/bold cyan] + +[bold]Goals to process:[/bold] {progress_state["expected"] or "unknown"} + +[yellow]⏳ Attack running...[/yellow] +[dim]Progress: 45%[/dim]""", + ) + return + + if et == "goal_finalized": + progress_state["goals_done"] += 1 + expected = progress_state["expected"] + if expected > 0: + pct = 45 + int(50 * progress_state["goals_done"] / expected) + pct = min(pct, 95) + else: + # Unknown total — creep up but never reach 95% + pct = min(45 + progress_state["goals_done"] * 5, 90) + self.app.call_from_thread(progress_bar.update, progress=pct) + success = bool(payload.get("success")) + icon = "✓" if success else "✗" + elapsed = payload.get("elapsed_s") + elapsed_s = ( + f" ({elapsed:.1f}s)" + if isinstance(elapsed, (int, float)) + else "" + ) + summary = ( + f"Goal {progress_state['goals_done']}" + + (f"/{expected}" if expected else "") + + f" {icon}{elapsed_s}" + ) + self.app.call_from_thread( + status_widget.update, + f"""[bold cyan]⚔️ Executing {_escape(strategy_name)} Attack...[/bold cyan] - stop_progress = threading.Event() +[bold]Last:[/bold] {summary} - def update_progress_gradually(): - for progress in range(50, 91, 5): - if stop_progress.is_set(): - break - self.app.call_from_thread(progress_bar.update, progress=progress) - time.sleep(2) +[yellow]⏳ Attack running...[/yellow] +[dim]Progress: {pct}%[/dim]""", + ) + return + + if ( + et == "step_started" + and payload.get("step_name") == "Evaluation Pipeline" + ): + self.app.call_from_thread(progress_bar.update, progress=96) + self.app.call_from_thread( + status_widget.update, + """[bold cyan]⚖ Running evaluation pipeline...[/bold cyan] + +[dim]Progress: 96%[/dim]""", + ) - progress_thread = threading.Thread( - target=update_progress_gradually, daemon=True - ) - progress_thread.start() + tui_event_bus.subscribe(_on_bus_event) try: results = agent.hack( attack_config=attack_config, run_config_override={"timeout": timeout}, fail_on_run_error=True, - _tui_app=self.app, - _tui_log_callback=log_callback, + _tui_event_bus=tui_event_bus, ) finally: - stop_progress.set() - progress_thread.join(timeout=1) + tui_event_bus.unsubscribe(_on_bus_event) sys.stdout = original_stdout sys.stderr = original_stderr diff --git a/hackagent/cli/tui/views/dashboard.py b/hackagent/cli/tui/views/dashboard.py deleted file mode 100644 index cc7ee21c..00000000 --- a/hackagent/cli/tui/views/dashboard.py +++ /dev/null @@ -1,352 +0,0 @@ -# Copyright 2026 - AI4I. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -""" -Dashboard Tab - -Overview and statistics for HackAgent. -""" - -from typing import Any -import json - -from textual.app import ComposeResult -from textual.containers import Horizontal, Vertical, VerticalScroll -from textual.widgets import Static, Tree - -from hackagent.cli.config import CLIConfig -from hackagent.cli.tui.base import BaseTab - - -def _escape(value: Any) -> str: - """Escape a value for safe Rich markup rendering. - - Args: - value: Any value to escape - - Returns: - String with Rich markup characters escaped - - Note: - We escape ALL square brackets, not just tag-like patterns, - because Rich's markup parser can get confused by unescaped - brackets in certain contexts (e.g., JSON arrays inside colored text). - """ - if value is None: - return "" - # Escape ALL square brackets to prevent any markup interpretation issues - text = str(value) - return text.replace("[", "\\[").replace("]", "\\]") - - -class DashboardTab(BaseTab): - """Dashboard tab showing overview and statistics.""" - - DEFAULT_CSS = "" - - def __init__(self, cli_config: CLIConfig): - """Initialize dashboard tab. - - Args: - cli_config: CLI configuration object - """ - super().__init__(cli_config) - self.stats = { - "agents": 0, - "attacks": 0, - "results": 0, - "success_rate": 0.0, - } - - def compose(self) -> ComposeResult: - """Compose the dashboard layout.""" - # Title section - yield Static( - "[bold cyan]━━━ Dashboard Overview ━━━[/bold cyan]", id="dashboard-title" - ) - - # Statistics section with better formatting - yield Static("[bold yellow]📊 Statistics[/bold yellow]", id="stats-header") - - with Horizontal(): - with Vertical(): - yield Static( - "🎯 [bold]Target Agents[/bold]\n[cyan]0[/cyan]", - id="stat-agents", - ) - yield Static( - "⚔️ [bold]Attacks[/bold]\n[green]0[/green]", id="stat-attacks" - ) - with Vertical(): - yield Static( - "📋 [bold]Results[/bold]\n[yellow]0[/yellow]", id="stat-results" - ) - yield Static( - "✓ [bold]Success Rate[/bold]\n[magenta]0%[/magenta]", - id="stat-success", - ) - - # Activity section - yield Static( - "\n[bold yellow]📝 Recent Activity & Traces[/bold yellow]", - id="activity-header", - ) - - yield Tree("Activity Log", id="activity-tree") - with VerticalScroll(id="activity-scroll"): - yield Static("[dim]Waiting for data...[/dim]", id="activity-log") - - def on_mount(self) -> None: - """Called when the tab is mounted.""" - # Call base class mount to handle initial refresh - super().on_mount() - - # Enable auto-refresh every 5 seconds - self.enable_auto_refresh(interval=5.0) - - def refresh_data(self) -> None: - """Refresh dashboard data from local backend.""" - try: - from hackagent.server.storage.local import LocalBackend - - backend = LocalBackend() - - agents_data = [] - results_data = [] - - # Fetch agents count - try: - agents_page = backend.list_agents(page=1, page_size=100) - agents_data = agents_page.items - self.stats["agents"] = agents_page.total - except Exception: - pass - - # Fetch results count - try: - results_page = backend.list_results(page=1, page_size=100) - results_data = results_page.items - self.stats["results"] = results_page.total - - if results_data: - completed = sum( - 1 - for r in results_data - if hasattr(r, "evaluation_status") - and str( - r.evaluation_status.value - if hasattr(r.evaluation_status, "value") - else r.evaluation_status - ).upper() - == "COMPLETED" - ) - self.stats["success_rate"] = ( - (completed / len(results_data)) * 100 - if len(results_data) > 0 - else 0 - ) - except Exception: - pass - - # Update stat cards - self._update_stat_cards() - - # Update activity log - if not agents_data and not results_data: - self.query_one("#activity-tree", Tree).display = False - self.query_one("#activity-scroll").display = True - activity_log = self.query_one("#activity-log", Static) - activity_log.update( - "[yellow]No data found[/yellow]\n\n" - "[dim]Create target agents and run attacks to see activity here.[/dim]\n\n" - "[cyan]Quick Start:[/cyan]\n" - "1. Go to Target Agents tab to create a target agent\n" - "2. Go to Attacks tab to run security tests\n" - "3. Check Results tab to see outcomes" - ) - else: - self._update_activity_log(agents_data, results_data) - - except Exception as e: - self.query_one("#activity-tree", Tree).display = False - self.query_one("#activity-scroll").display = True - activity_log = self.query_one("#activity-log", Static) - error_msg = str(e) - activity_log.update( - f"[red]Error loading data:[/red]\n\n" - f"[yellow]Details:[/yellow]\n{error_msg}\n\n" - f"[dim]Press F5 to retry[/dim]" - ) - - def _update_stat_cards(self) -> None: - """Update the statistics cards with current data.""" - try: - # Get the values - agents_val = self.stats.get("agents", 0) - attacks_val = self.stats.get("attacks", 0) - results_val = self.stats.get("results", 0) - success_val = self.stats.get("success_rate", 0) - - # Update each stat widget by ID with icons and formatting - stat_agents = self.query_one("#stat-agents", Static) - stat_agents.update( - f"🎯 [bold]Target Agents[/bold]\n[cyan]{agents_val}[/cyan]" - ) - - stat_attacks = self.query_one("#stat-attacks", Static) - stat_attacks.update( - f"⚔️ [bold]Attacks[/bold]\n[green]{attacks_val}[/green]" - ) - - stat_results = self.query_one("#stat-results", Static) - stat_results.update( - f"📋 [bold]Results[/bold]\n[yellow]{results_val}[/yellow]" - ) - - stat_success = self.query_one("#stat-success", Static) - stat_success.update( - f"✓ [bold]Success Rate[/bold]\n[magenta]{success_val:.1f}%[/magenta]" - ) - - except Exception as e: - # Show error in activity log if update fails - try: - activity_log = self.query_one("#activity-log", Static) - activity_log.update( - f"[red]Error updating stats: {_escape(str(e))}[/red]" - ) - except Exception: - pass - - def _update_activity_log(self, agents: list, results: list) -> None: - """Update activity log with recent items. - - Args: - agents: List of agents - results: List of results - """ - try: - tree = self.query_one("#activity-tree", Tree) - self.query_one("#activity-scroll").display = False - tree.display = True - - # Clear existing data - tree.clear() - tree.root.expand() - - # Add recent agents - if agents: - agents_node = tree.root.add( - "[bold cyan]🎯 Recent Target Agents[/bold cyan]", expand=True - ) - for i, agent in enumerate(agents[:3], 1): - agent_type = ( - agent.agent_type.value - if hasattr(agent.agent_type, "value") - else agent.agent_type - ) - agent_name = _escape(agent.name) if agent.name else "Unnamed" - agents_node.add_leaf( - f"{i}. [cyan]{agent_name}[/cyan] [dim]({_escape(agent_type)})[/dim]" - ) - - # Add recent results and their traces - if results: - results_node = tree.root.add( - "[bold green]📋 Recent Results & Traces[/bold green]", expand=True - ) - for i, result in enumerate(results[:5], 1): - status = "Unknown" - status_color = "dim" - - if hasattr(result, "evaluation_status"): - status = ( - result.evaluation_status.value - if hasattr(result.evaluation_status, "value") - else str(result.evaluation_status) - ) - # Color code based on status - if status.upper() == "COMPLETED": - status_color = "green" - elif status.upper() == "RUNNING": - status_color = "yellow" - elif status.upper() == "FAILED": - status_color = "red" - - attack_type = getattr(result, "attack_type", "Unknown") - result_label = f"{i}. [yellow]{_escape(attack_type)}[/yellow] → [{status_color}]{_escape(status)}[/{status_color}]" - - res_node = results_node.add(result_label, expand=(i == 1)) - - # Add nested traces if available - traces = getattr(result, "traces", []) - if not traces: - res_node.add_leaf("[dim]No traces available[/dim]") - else: - for trace_idx, trace in enumerate(traces, 1): - step_type = getattr(trace, "step_type", "Unknown") - step_type_str = ( - step_type.value - if hasattr(step_type, "value") - else str(step_type) - ) - - content = getattr(trace, "content", None) - preview = "" - if content: - try: - if isinstance(content, str): - c_dict = json.loads(content) - else: - c_dict = content - - if isinstance(c_dict, dict): - if "thought" in c_dict: - preview = f": {_escape(str(c_dict['thought'])[:60])}..." - elif "tool_name" in c_dict: - preview = f" ([bright_magenta]🔧 {_escape(str(c_dict['tool_name']))}[/bright_magenta])" - elif "response" in c_dict: - preview = f": {_escape(str(c_dict['response'])[:60])}..." - else: - preview = f": {_escape(str(content)[:60])}..." - except Exception: - preview = f": {_escape(str(content)[:60])}..." - - trace_label = ( - f"└─ [magenta]{step_type_str}[/magenta]{preview}" - ) - - # Expand first few traces - trace_node = res_node.add( - trace_label, expand=(trace_idx <= 3) - ) - - # Add full content as leaf - if content: - try: - content_str = ( - json.dumps(content, indent=2) - if isinstance(content, dict) - else str(content) - ) - # limit leaf content to prevent slowing down TUI too much - content_str = content_str[:1000] + ( - "..." if len(content_str) > 1000 else "" - ) - trace_node.add_leaf( - f"[dim]{_escape(content_str)}[/dim]" - ) - except Exception: - pass - - except Exception as e: - # Fallback to static log on error - try: - self.query_one("#activity-tree", Tree).display = False - self.query_one("#activity-scroll").display = True - activity_log = self.query_one("#activity-log", Static) - activity_log.update( - f"[red]Error rendering traces: {_escape(str(e))}[/red]" - ) - except Exception: - pass diff --git a/hackagent/cli/tui/widgets/actions.py b/hackagent/cli/tui/widgets/actions.py index a7a15651..6184ccf6 100644 --- a/hackagent/cli/tui/widgets/actions.py +++ b/hackagent/cli/tui/widgets/actions.py @@ -356,3 +356,132 @@ def update_action_count(self, count: int) -> None: count_widget.update(f"[bold]Actions:[/bold] {count}") except Exception: pass + + # ------------------------------------------------------------------ + # Event bus integration + # ------------------------------------------------------------------ + + def subscribe_to_bus(self, bus: Any, app: Any) -> None: + """Subscribe this viewer to a :class:`TUIEventBus`. + + Events arrive on the emitting (worker) thread, so each handler + marshals back onto the Textual UI thread via ``app.call_from_thread``. + """ + + def _on_event(event: Any) -> None: + try: + app.call_from_thread(self._handle_event, event) + except Exception: + pass + + bus.subscribe(_on_event) + + def _handle_event(self, event: Any) -> None: + """Translate a TUI bus event into a row in the actions viewer.""" + et = event.event_type + payload = event.payload or {} + + if et == "step_started": + self.add_step_separator( + payload.get("step_name", "Step"), + payload.get("step_number", 0) or 0, + ) + + elif et == "goal_started": + goal_index = payload.get("goal_index", 0) + goal = (payload.get("goal") or "")[:120] + attack_type = (payload.get("attack_type") or "").upper() + label = f"Goal #{goal_index + 1}" + if attack_type: + label = f"{attack_type} — {label}" + if goal: + label += f": {goal}" + self.add_step_separator(label, goal_index + 1) + + elif et == "goal_finalized": + success = bool(payload.get("success")) + actions_widget = self.query_one("#actions-display", RichLog) + icon = ( + "[bright_green]✓ JAILBREAK[/bright_green]" + if success + else "[red]✗ REFUSED[/red]" + ) + elapsed = payload.get("elapsed_s") + elapsed_s = ( + f" [dim]({elapsed:.1f}s)[/dim]" + if isinstance(elapsed, (int, float)) + else "" + ) + actions_widget.write( + f"[dim]── Goal #{payload.get('goal_index', '?') + 1 if isinstance(payload.get('goal_index'), int) else '?'} {icon}{elapsed_s} ──[/dim]" + ) + + elif et == "trace_added": + self._render_trace(payload) + + def _render_trace(self, payload: Dict[str, Any]) -> None: + """Render a `trace_added` payload using the existing add_* helpers.""" + step_type = (payload.get("step_type") or "").upper() + step_name = payload.get("step_name") or step_type or "Trace" + content = payload.get("content") or {} + sequence = payload.get("sequence") + + # Tool call / response → use add_tool_call so we get the structured + # arguments + result layout. + if step_type == "TOOL_CALL": + tool_name = ( + content.get("name") + or content.get("tool") + or content.get("function", {}).get("name") + or "unknown" + ) + args = ( + content.get("arguments") + or content.get("input") + or content.get("parameters") + ) + self.add_tool_call( + tool_name=tool_name, + arguments=args if isinstance(args, dict) else None, + step_number=sequence, + ) + return + + if step_type == "TOOL_RESPONSE": + tool_name = content.get("tool") or content.get("name") or "tool" + result = ( + content.get("result") + or content.get("output") + or content.get("response") + ) + self.add_tool_call( + tool_name=tool_name, + result=str(result) if result is not None else None, + step_number=sequence, + ) + return + + # Evaluation / scorer steps — render as a small card. + if step_name == "Evaluation" or content.get("evaluator"): + actions_widget = self.query_one("#actions-display", RichLog) + score = content.get("score") + evaluator = content.get("evaluator") or "evaluator" + actions_widget.write( + f"[bold yellow]⚖ {_escape(evaluator)}[/bold yellow]" + + ( + f" score=[bright_white]{score}[/bright_white]" + if score is not None + else "" + ) + ) + return + + # Generic interaction — show key/value-style summary. + actions_widget = self.query_one("#actions-display", RichLog) + actions_widget.write( + f"[dim]· {sequence or self._action_count + 1}[/dim] " + f"[bold cyan]{_escape(step_name)}[/bold cyan] " + f"[dim]{_escape(step_type)}[/dim]" + ) + self._action_count += 1 + self.update_action_count(self._action_count) diff --git a/hackagent/cli/tui/widgets/logs.py b/hackagent/cli/tui/widgets/logs.py index 633dcbb7..b0ecf2c1 100644 --- a/hackagent/cli/tui/widgets/logs.py +++ b/hackagent/cli/tui/widgets/logs.py @@ -8,11 +8,11 @@ with syntax highlighting, auto-scrolling, and filtering capabilities. """ -from typing import Any +from typing import Any, List, Optional, Tuple from textual.app import ComposeResult -from textual.containers import Container -from textual.widgets import Button, RichLog, Static +from textual.containers import Container, Horizontal +from textual.widgets import Button, Checkbox, Input, RichLog, Static def _escape(value: Any) -> str: @@ -71,6 +71,24 @@ class AttackLogViewer(Container): layout: horizontal; } + AttackLogViewer .log-filters { + dock: top; + height: 3; + background: $surface; + padding: 0 1; + layout: horizontal; + } + + AttackLogViewer .log-filters Checkbox { + width: 14; + margin: 0 1; + } + + AttackLogViewer #log-search { + width: 1fr; + margin: 0 1; + } + AttackLogViewer RichLog { background: $surface; border: none; @@ -105,8 +123,17 @@ def __init__( self.show_controls = show_controls self.max_lines = max_lines self._auto_scroll = True - self._line_count = 0 # Track line count internally - self._log_buffer: list[str] = [] # Store log messages for copying + # Structured ring buffer: (level, raw_message). Headers (step markers, + # banners, etc.) are stored with level ``"HEADER"`` so filtering can + # treat them as level-independent. + self._records: List[Tuple[str, str]] = [] + self._level_enabled = { + "DEBUG": True, + "INFO": True, + "WARNING": True, + "ERROR": True, + } + self._search_query: str = "" def compose(self) -> ComposeResult: """Compose the log viewer layout.""" @@ -118,13 +145,21 @@ def compose(self) -> ComposeResult: # Control buttons (optional) if self.show_controls: - with Container(classes="log-controls"): - yield Button("Clear Logs", id="clear-logs", variant="default") - yield Button("Copy Logs", id="copy-logs", variant="default") - yield Button("View in Pager", id="view-pager", variant="default") + with Horizontal(classes="log-controls"): + yield Button("Clear", id="clear-logs", variant="default") + yield Button("Copy", id="copy-logs", variant="default") + yield Button("Save", id="save-logs", variant="default") + yield Button("Pager", id="view-pager", variant="default") yield Button("Auto-scroll: ON", id="toggle-scroll", variant="primary") yield Static("", id="log-count") + with Horizontal(classes="log-filters"): + yield Checkbox("DEBUG", value=True, id="filter-debug") + yield Checkbox("INFO", value=True, id="filter-info") + yield Checkbox("WARN", value=True, id="filter-warning") + yield Checkbox("ERROR", value=True, id="filter-error") + yield Input(placeholder="search…", id="log-search") + # Log display area rich_log = RichLog( highlight=True, @@ -144,105 +179,192 @@ def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "clear-logs": self.clear_logs() elif event.button.id == "copy-logs": - self.copy_logs() + ok = self.copy_logs() self.notify( - "Logs copied to clipboard!", title="Copy", severity="information" + "Logs copied to clipboard!" if ok else "Nothing to copy", + title="Copy", + severity="information" if ok else "warning", ) + elif event.button.id == "save-logs": + path = self.save_logs_to_file() + if path: + self.notify(f"Saved to {path}", title="Save", severity="information") + else: + self.notify("Nothing to save", title="Save", severity="warning") elif event.button.id == "view-pager": self.view_in_pager() elif event.button.id == "toggle-scroll": self.toggle_auto_scroll() - def add_log(self, message: str, level: str = "INFO") -> None: - """ - Add a log message to the viewer with appropriate styling. - - Args: - message: The log message to display - level: Log level (INFO, WARNING, ERROR, DEBUG) - """ - log_widget = self.query_one("#attack-log-display", RichLog) - - # Color code based on log level - level_colors = { - "DEBUG": "dim", - "INFO": "cyan", - "WARNING": "yellow", - "ERROR": "bold red", - "CRITICAL": "bold red on white", + def on_checkbox_changed(self, event: Checkbox.Changed) -> None: + """React to level-filter toggles by re-rendering the visible log.""" + mapping = { + "filter-debug": "DEBUG", + "filter-info": "INFO", + "filter-warning": "WARNING", + "filter-error": "ERROR", } + level = mapping.get(event.checkbox.id or "") + if level is None: + return + self._level_enabled[level] = bool(event.value) + self._rerender() + + def on_input_changed(self, event: Input.Changed) -> None: + """React to the search input (case-insensitive substring filter).""" + if event.input.id == "log-search": + self._search_query = (event.value or "").strip().lower() + self._rerender() + + _LEVEL_COLORS = { + "DEBUG": "dim", + "INFO": "cyan", + "WARNING": "yellow", + "ERROR": "bold red", + "CRITICAL": "bold red on white", + } - color = level_colors.get(level, "white") + def _format_record(self, level: str, message: str) -> str: + """Render one (level, message) record as Rich markup.""" + if level == "HEADER": + # Pre-formatted banner — write through unchanged. + return message + color = self._LEVEL_COLORS.get(level, "white") + escaped = _escape(message) + if level in ("ERROR", "CRITICAL"): + return f"[{color}]🔴 {escaped}[/{color}]" + if level == "WARNING": + return f"[{color}]⚠️ {escaped}[/{color}]" + if level == "DEBUG": + return f"[{color}]🔍 {escaped}[/{color}]" + return f"[{color}]{escaped}[/{color}]" + + def _record_visible(self, level: str, message: str) -> bool: + """Apply level + search filters. Headers are always visible.""" + if level != "HEADER": + normalized = "ERROR" if level == "CRITICAL" else level + if not self._level_enabled.get(normalized, True): + return False + if self._search_query and self._search_query not in message.lower(): + return False + return True - # Format the message with color - escape user content - escaped_message = _escape(message) - if level in ["ERROR", "CRITICAL"]: - formatted_message = f"[{color}]🔴 {escaped_message}[/{color}]" - elif level == "WARNING": - formatted_message = f"[{color}]⚠️ {escaped_message}[/{color}]" - elif level == "DEBUG": - formatted_message = f"[{color}]🔍 {escaped_message}[/{color}]" - else: # INFO and default - formatted_message = f"[{color}]{escaped_message}[/{color}]" + def _rerender(self) -> None: + """Rewrite the RichLog from the structured buffer.""" + try: + log_widget = self.query_one("#attack-log-display", RichLog) + except Exception: + return + log_widget.clear() + visible = 0 + for level, message in self._records: + if not self._record_visible(level, message): + continue + log_widget.write(self._format_record(level, message)) + visible += 1 + if self._auto_scroll: + log_widget.scroll_end(animate=False) + self.update_log_count(visible) - # Add to log display - log_widget.write(formatted_message) + def add_log(self, message: str, level: str = "INFO") -> None: + """Append a log message; respects current level/search filters.""" + self._records.append((level, message)) + # Keep the structured buffer bounded. + if len(self._records) > self.max_lines: + del self._records[: len(self._records) - self.max_lines] - # Store in buffer for copying (strip Rich markup) - plain_message = message # Store the original message without formatting - log_entry = f"[{level}] {plain_message}" - self._log_buffer.append(log_entry) + if not self._record_visible(level, message): + return - # Auto-scroll to bottom if enabled + try: + log_widget = self.query_one("#attack-log-display", RichLog) + except Exception: + return + log_widget.write(self._format_record(level, message)) if self._auto_scroll: log_widget.scroll_end(animate=False) - - # Update log count - self._line_count += 1 - self.update_log_count(self._line_count) + self.update_log_count( + sum(1 for lv, msg in self._records if self._record_visible(lv, msg)) + ) def add_step_header(self, step_name: str, step_number: int = 0) -> None: - """ - Add a prominent step header to visually separate pipeline steps. - - Args: - step_name: Name of the step - step_number: Step number (0 for no number) - """ - log_widget = self.query_one("#attack-log-display", RichLog) - - # Create a visual separator - escape step_name + """Append a step banner. Always visible regardless of level filters.""" separator = "─" * 60 escaped_step_name = _escape(step_name) if step_number > 0: - header = f"\n[bold magenta]{separator}\n🎯 STEP {step_number}: {escaped_step_name}\n{separator}[/bold magenta]\n" + banner = ( + f"\n[bold magenta]{separator}\n" + f"🎯 STEP {step_number}: {escaped_step_name}\n" + f"{separator}[/bold magenta]\n" + ) else: - header = f"\n[bold magenta]{separator}\n🎯 {escaped_step_name}\n{separator}[/bold magenta]\n" - - log_widget.write(header) - + banner = ( + f"\n[bold magenta]{separator}\n" + f"🎯 {escaped_step_name}\n" + f"{separator}[/bold magenta]\n" + ) + self._records.append(("HEADER", banner)) + try: + log_widget = self.query_one("#attack-log-display", RichLog) + except Exception: + return + log_widget.write(banner) if self._auto_scroll: log_widget.scroll_end(animate=False) def clear_logs(self) -> None: """Clear all log messages from the viewer.""" - log_widget = self.query_one("#attack-log-display", RichLog) - log_widget.clear() - self._line_count = 0 - self._log_buffer.clear() + try: + log_widget = self.query_one("#attack-log-display", RichLog) + log_widget.clear() + except Exception: + pass + self._records.clear() self.update_log_count(0) + def _filtered_plaintext(self) -> str: + """Return the plain text of currently visible records.""" + import re + + lines: list[str] = [] + for level, message in self._records: + if not self._record_visible(level, message): + continue + if level == "HEADER": + # Strip Rich markup tags for the plaintext copy. + lines.append(re.sub(r"\[/?[^]]+\]", "", message).strip("\n")) + else: + lines.append(f"[{level}] {message}") + return "\n".join(lines) + + def save_logs_to_file(self) -> "Optional[str]": + """Save current (filtered) log text to a timestamped temp file.""" + from datetime import datetime + import os + import tempfile + + text = self._filtered_plaintext() + if not text: + return None + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + path = os.path.join(tempfile.gettempdir(), f"hackagent_logs_{ts}.log") + try: + with open(path, "w", encoding="utf-8") as f: + f.write(text) + return path + except Exception: + return None + def copy_logs(self) -> bool: - """Copy all log messages to clipboard or save to file. + """Copy currently visible logs to the clipboard (falls back to file). Returns: True if logs were copied successfully, False otherwise. """ - if not self._log_buffer: + log_text = self._filtered_plaintext() + if not log_text: return False - log_text = "\n".join(self._log_buffer) - # Try multiple clipboard methods copied = False @@ -324,8 +446,9 @@ def copy_logs(self) -> bool: return False def view_in_pager(self) -> None: - """View logs in a pager (less) for easy selection and navigation.""" - if not self._log_buffer: + """View currently visible logs in $PAGER / less for navigation.""" + log_text = self._filtered_plaintext() + if not log_text: return try: @@ -333,8 +456,6 @@ def view_in_pager(self) -> None: import subprocess import os - # Save to temporary file - log_text = "\n".join(self._log_buffer) temp_file = tempfile.NamedTemporaryFile( mode="w", suffix=".log", delete=False ) @@ -381,13 +502,8 @@ def update_log_count(self, count: int) -> None: count_widget.update(f"[dim]Lines: {count}/{self.max_lines}[/dim]") def get_log_text(self) -> str: - """ - Get all log text as a plain string (for export). - - Returns: - All log messages as plain text - """ - return "\n".join(self._log_buffer) + """All log text as a plain string — currently visible records only.""" + return self._filtered_plaintext() def load_logs_from_buffer(self, buffer: list[tuple[str, str]]) -> None: """ diff --git a/hackagent/router/tracking/context.py b/hackagent/router/tracking/context.py index feb02d24..1398acb5 100644 --- a/hackagent/router/tracking/context.py +++ b/hackagent/router/tracking/context.py @@ -52,6 +52,9 @@ class TrackingContext: logger: Optional[logging.Logger] = None sequence_counter: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) + # Optional :class:`hackagent.cli.tui.events.TUIEventBus`. When set, the + # :class:`StepTracker` emits ``step_started`` / ``step_ended`` events. + event_bus: Optional[Any] = None def __post_init__(self): """Initialize default logger if not provided.""" diff --git a/hackagent/router/tracking/coordinator.py b/hackagent/router/tracking/coordinator.py index 60c94a81..90424808 100644 --- a/hackagent/router/tracking/coordinator.py +++ b/hackagent/router/tracking/coordinator.py @@ -114,6 +114,7 @@ def create( initial_metadata: Optional[Dict[str, Any]] = None, goal_index_start: int = 0, run_start_time: Optional[float] = None, + event_bus: Optional[Any] = None, ) -> "TrackingCoordinator": """ Factory method to create a fully-initialized coordinator. @@ -144,6 +145,7 @@ def create( logger=_logger, attack_type=attack_type, category_classifier_config=category_classifier_config, + event_bus=event_bus, ) tracking_context = TrackingContext( @@ -151,6 +153,7 @@ def create( run_id=run_id, parent_result_id=None, logger=_logger, + event_bus=event_bus, ) tracking_context.add_metadata("attack_type", attack_type) step_tracker = StepTracker(tracking_context) diff --git a/hackagent/router/tracking/step.py b/hackagent/router/tracking/step.py index 3f46fd07..d6daf8bd 100644 --- a/hackagent/router/tracking/step.py +++ b/hackagent/router/tracking/step.py @@ -103,12 +103,55 @@ def track_step( Re-raises any exception from the tracked code block after recording the error state. """ + bus = getattr(self.context, "event_bus", None) + if not self.context.is_enabled: - # Tracking disabled, just yield and return - yield None + # Tracking disabled, just yield and return — but still emit + # lifecycle events if a bus is attached (it is decoupled from + # backend availability). + if bus is not None: + try: + bus.emit("step_started", step_name=step_name, step_type=step_type) + except Exception: + self.logger.debug("StepTracker event emit failed", exc_info=True) + try: + yield None + except Exception as e: + if bus is not None: + try: + bus.emit( + "step_ended", + step_name=step_name, + step_type=step_type, + success=False, + error=str(e), + ) + except Exception: + self.logger.debug( + "StepTracker event emit failed", exc_info=True + ) + raise + else: + if bus is not None: + try: + bus.emit( + "step_ended", + step_name=step_name, + step_type=step_type, + success=True, + ) + except Exception: + self.logger.debug( + "StepTracker event emit failed", exc_info=True + ) return trace_id = None + if bus is not None: + try: + bus.emit("step_started", step_name=step_name, step_type=step_type) + except Exception: + self.logger.debug("StepTracker event emit failed", exc_info=True) try: # Create trace record at step start trace_id = self._create_trace( @@ -128,6 +171,16 @@ def track_step( step_type=step_type, status="completed", ) + if bus is not None: + try: + bus.emit( + "step_ended", + step_name=step_name, + step_type=step_type, + success=True, + ) + except Exception: + self.logger.debug("StepTracker event emit failed", exc_info=True) except Exception as e: # Handle step failure @@ -139,6 +192,17 @@ def track_step( status="failed", error_message=str(e), ) + if bus is not None: + try: + bus.emit( + "step_ended", + step_name=step_name, + step_type=step_type, + success=False, + error=str(e), + ) + except Exception: + self.logger.debug("StepTracker event emit failed", exc_info=True) # Re-raise to allow caller to handle raise diff --git a/hackagent/router/tracking/tracker.py b/hackagent/router/tracking/tracker.py index 052fd003..d6ad4cc9 100644 --- a/hackagent/router/tracking/tracker.py +++ b/hackagent/router/tracking/tracker.py @@ -114,6 +114,7 @@ def __init__( logger: Optional[logging.Logger] = None, attack_type: Optional[str] = None, category_classifier_config: Optional[Dict[str, Any]] = None, + event_bus: Optional[Any] = None, ): """ Initialize tracker. @@ -123,11 +124,16 @@ def __init__( run_id: Server-side run record ID logger: Optional logger instance attack_type: Optional attack type identifier for metadata + event_bus: Optional :class:`hackagent.cli.tui.events.TUIEventBus`. + When provided, the tracker emits structured events + (``goal_started``, ``goal_finalized``, ``evaluation``, ...) + so the TUI can render execution live without parsing logs. """ self.backend = backend self.run_id = run_id self.logger = logger or get_logger(__name__) self.attack_type = attack_type + self.event_bus = event_bus self._goal_category_classifier = GoalCategoryClassifier( backend=backend, config=category_classifier_config, @@ -135,6 +141,16 @@ def __init__( ) self._goal_contexts: Dict[int, Context] = {} + def _emit(self, event_type: str, **payload: Any) -> None: + """Emit on ``event_bus`` if present; swallow any error.""" + bus = self.event_bus + if bus is None: + return + try: + bus.emit(event_type, **payload) + except Exception: + self.logger.debug("Tracker event emit failed", exc_info=True) + @property def is_enabled(self) -> bool: """Check if tracking is enabled (has backend and run_id).""" @@ -175,12 +191,28 @@ def create_goal_result( if not self.is_enabled: self.logger.debug(f"Tracking disabled - goal {goal_index} won't be tracked") self._goal_contexts[goal_index] = ctx + # Bus delivery is independent of backend tracking — surface the + # event so the TUI still sees the goal even in headless mode. + self._emit( + "goal_started", + goal=goal, + goal_index=goal_index, + attack_type=self.attack_type, + result_id=None, + ) return ctx try: run_uuid = self._get_run_uuid() if not run_uuid: self._goal_contexts[goal_index] = ctx + self._emit( + "goal_started", + goal=goal, + goal_index=goal_index, + attack_type=self.attack_type, + result_id=None, + ) return ctx # Classify each goal as soon as its result record is created. @@ -225,6 +257,13 @@ def create_goal_result( ) self._goal_contexts[goal_index] = ctx + self._emit( + "goal_started", + goal=goal, + goal_index=goal_index, + attack_type=self.attack_type, + result_id=ctx.result_id, + ) return ctx def _classify_goal_labels(self, goal: str) -> Dict[str, str]: @@ -389,6 +428,18 @@ def _add_trace( } ctx.traces.append(trace_record) + # Surface the trace as a structured TUI event. Subscribers translate + # the step_type / step_name into "tool_call", "evaluation", etc. + self._emit( + "trace_added", + goal_index=ctx.goal_index, + sequence=seq, + step_name=step_name, + step_type=trace_record["step_type"], + content=sanitized_content, + elapsed_s=trace_record["elapsed_s"], + ) + # Send to backend if enabled and we have a result_id if not self.is_enabled or not ctx.result_id: return None @@ -481,6 +532,19 @@ def finalize_goal( ctx.metadata["total_traces"] = len(ctx.traces) ctx.metadata["elapsed_s"] = round(ctx.elapsed_s, 3) + # Emit goal_finalized regardless of backend tracking so the TUI can + # tick progress even when running without a server-side run record. + self._emit( + "goal_finalized", + goal_index=ctx.goal_index, + goal=ctx.goal, + success=bool(final_success), + total_traces=len(ctx.traces), + elapsed_s=ctx.metadata["elapsed_s"], + evaluation_notes=evaluation_notes, + result_id=ctx.result_id, + ) + if not self.is_enabled or not ctx.result_id: return False diff --git a/tests/unit/cli/tui/test_events.py b/tests/unit/cli/tui/test_events.py new file mode 100644 index 00000000..718da322 --- /dev/null +++ b/tests/unit/cli/tui/test_events.py @@ -0,0 +1,193 @@ +# Copyright 2026 - AI4I. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for :mod:`hackagent.cli.tui.events`.""" + +import threading +import time + +import pytest + +from hackagent.cli.tui.events import ( + EVENT_GOAL_FINALIZED, + EVENT_GOAL_STARTED, + EVENT_STEP_ENDED, + EVENT_STEP_STARTED, + EVENT_TOOL_CALL, + TUIEvent, + TUIEventBus, +) + + +class TestTUIEventBusBasics: + """Smoke tests for emit/subscribe/unsubscribe.""" + + def test_subscribe_specific_event_delivers_event(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_STEP_STARTED) + + bus.emit(EVENT_STEP_STARTED, step_name="Attack") + + assert len(received) == 1 + assert received[0].event_type == EVENT_STEP_STARTED + assert received[0].payload == {"step_name": "Attack"} + assert received[0].timestamp > 0 + + def test_subscribe_specific_event_filters_other_types(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_GOAL_FINALIZED) + + bus.emit(EVENT_STEP_STARTED, step_name="Attack") + bus.emit(EVENT_GOAL_FINALIZED, goal_index=0, success=True) + + assert len(received) == 1 + assert received[0].event_type == EVENT_GOAL_FINALIZED + + def test_catch_all_subscriber_receives_every_event(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append) # event_type=None + + bus.emit(EVENT_STEP_STARTED) + bus.emit(EVENT_GOAL_STARTED, goal_index=0) + bus.emit(EVENT_TOOL_CALL, tool_name="grep") + + assert [e.event_type for e in received] == [ + EVENT_STEP_STARTED, + EVENT_GOAL_STARTED, + EVENT_TOOL_CALL, + ] + + def test_unsubscribe_stops_delivery(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_STEP_STARTED) + bus.unsubscribe(received.append, event_type=EVENT_STEP_STARTED) + + bus.emit(EVENT_STEP_STARTED, step_name="Attack") + assert received == [] + + def test_unsubscribe_unknown_is_noop(self) -> None: + bus = TUIEventBus() + # Not raising means the test passes. + bus.unsubscribe(lambda e: None, event_type=EVENT_STEP_STARTED) + bus.unsubscribe(lambda e: None) + + def test_clear_removes_all_subscribers(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_STEP_STARTED) + bus.subscribe(received.append) + bus.clear() + + bus.emit(EVENT_STEP_STARTED) + assert received == [] + + +class TestTUIEventBusIsolation: + """A misbehaving subscriber must not break the bus.""" + + def test_subscriber_exception_does_not_propagate(self) -> None: + bus = TUIEventBus() + + def bad(_event: TUIEvent) -> None: + raise RuntimeError("boom") + + received: list[TUIEvent] = [] + bus.subscribe(bad, event_type=EVENT_STEP_STARTED) + bus.subscribe(received.append, event_type=EVENT_STEP_STARTED) + + # Should NOT raise, and the second subscriber must still receive. + bus.emit(EVENT_STEP_STARTED, step_name="ok") + assert len(received) == 1 + + def test_subscriber_exception_does_not_block_catchall(self) -> None: + bus = TUIEventBus() + received_catchall: list[TUIEvent] = [] + bus.subscribe(lambda _e: (_ for _ in ()).throw(ValueError("nope"))) + bus.subscribe(received_catchall.append) + + bus.emit(EVENT_TOOL_CALL, tool_name="grep") + assert len(received_catchall) == 1 + + +class TestTUIEventBusThreadSafety: + """Concurrent emitters / subscribers shouldn't drop or duplicate events.""" + + def test_concurrent_emits_deliver_all(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + lock = threading.Lock() + + def collect(event: TUIEvent) -> None: + with lock: + received.append(event) + + bus.subscribe(collect, event_type=EVENT_GOAL_FINALIZED) + + per_thread = 50 + threads = [ + threading.Thread( + target=lambda i=i: [ + bus.emit(EVENT_GOAL_FINALIZED, goal_index=i, n=k) + for k in range(per_thread) + ] + ) + for i in range(4) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(received) == 4 * per_thread + + def test_subscribe_during_emit_is_safe(self) -> None: + """Adding a subscriber while emit() runs must not corrupt the list.""" + bus = TUIEventBus() + received: list[TUIEvent] = [] + + def subscriber_that_adds(_event: TUIEvent) -> None: + received.append(_event) + # Subscribe a no-op while we're in a dispatch — exercising the + # snapshot-under-lock pattern the bus relies on. + bus.subscribe(lambda _e: None, event_type=EVENT_STEP_ENDED) + + bus.subscribe(subscriber_that_adds, event_type=EVENT_STEP_ENDED) + bus.emit(EVENT_STEP_ENDED, step_name="x") + bus.emit(EVENT_STEP_ENDED, step_name="y") + assert len(received) == 2 + + +class TestTUIEventBusPayload: + """Sanity checks on payload shape.""" + + def test_payload_is_copied_per_event(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_TOOL_CALL) + + # Caller can mutate kwargs without affecting delivered payload. + kwargs = {"tool_name": "grep", "args": [1, 2, 3]} + bus.emit(EVENT_TOOL_CALL, **kwargs) + kwargs["tool_name"] = "mutated" + + assert received[0].payload["tool_name"] == "grep" + + def test_timestamp_is_monotonic_within_burst(self) -> None: + bus = TUIEventBus() + received: list[TUIEvent] = [] + bus.subscribe(received.append) + + for i in range(5): + bus.emit(EVENT_STEP_STARTED, i=i) + time.sleep(0.001) + + timestamps = [e.timestamp for e in received] + assert timestamps == sorted(timestamps) + + +if __name__ == "__main__": # pragma: no cover + pytest.main([__file__, "-v"]) diff --git a/tests/unit/cli/tui/test_tui_results_and_logs.py b/tests/unit/cli/tui/test_tui_results_and_logs.py index b0f8f71d..8ca881da 100644 --- a/tests/unit/cli/tui/test_tui_results_and_logs.py +++ b/tests/unit/cli/tui/test_tui_results_and_logs.py @@ -501,46 +501,9 @@ def run(self, goals): # Verify result assert result == ["result"] - def test_attach_detach_tui_handler(self) -> None: - """Test attaching and detaching TUI handler from attack instance.""" - from hackagent.cli.tui.logger import ( - TUILogHandler, - attach_tui_handler, - detach_tui_handler, - ) - - class MockAttack: - pass - - mock_app = MagicMock() - mock_callback = MagicMock() - - attack = MockAttack() - - # Attach - handler = attach_tui_handler(attack, mock_app, mock_callback) - - assert hasattr(attack, "_tui_log_handler") - assert attack._tui_log_handler == handler - assert isinstance(handler, TUILogHandler) - - # Detach - detached = detach_tui_handler(attack) - - assert detached == handler - assert not hasattr(attack, "_tui_log_handler") - - def test_detach_nonexistent_handler(self) -> None: - """Test detaching handler when none exists.""" - from hackagent.cli.tui.logger import detach_tui_handler - - class MockAttack: - pass - - attack = MockAttack() - result = detach_tui_handler(attack) - - assert result is None + # NOTE: tests for the removed ``attach_tui_handler`` / ``detach_tui_handler`` + # helpers were dropped in the TUI cleanup pass — those helpers were never + # called by production code and have been removed. # ============================================================================ diff --git a/tests/unit/cli/tui/test_widgets_and_bus_integration.py b/tests/unit/cli/tui/test_widgets_and_bus_integration.py new file mode 100644 index 00000000..afd1925f --- /dev/null +++ b/tests/unit/cli/tui/test_widgets_and_bus_integration.py @@ -0,0 +1,347 @@ +# Copyright 2026 - AI4I. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for the post-event-bus TUI changes: + +- Filtering / search helpers on :class:`AttackLogViewer`. +- :class:`AgentActionsViewer` event-bus translation + (``trace_added``, ``goal_started``, ``goal_finalized``). +- :class:`hackagent.router.tracking.Tracker` emits structured events when + given an ``event_bus``. +- :class:`hackagent.router.tracking.StepTracker` emits ``step_started`` / + ``step_ended`` even when backend tracking is disabled. +""" + +from typing import Any, List +from unittest.mock import MagicMock + +import pytest + +from hackagent.cli.tui.events import ( + EVENT_GOAL_FINALIZED, + EVENT_GOAL_STARTED, + EVENT_STEP_ENDED, + EVENT_STEP_STARTED, + TUIEvent, + TUIEventBus, +) +from hackagent.cli.tui.widgets.logs import AttackLogViewer + + +# ============================================================================ +# AttackLogViewer filtering / search helpers +# ============================================================================ + + +class TestAttackLogViewerFilters: + """Test the pure logic in the new filter / search pipeline.""" + + def _make_viewer(self) -> AttackLogViewer: + # No mount — we only exercise helper methods that don't touch widgets. + return AttackLogViewer() + + def test_format_record_color_per_level(self) -> None: + v = self._make_viewer() + assert "[cyan]" in v._format_record("INFO", "x") + assert "[bold red]" in v._format_record("ERROR", "x") + assert "[yellow]" in v._format_record("WARNING", "x") + assert "[dim]" in v._format_record("DEBUG", "x") + + def test_format_record_header_passthrough(self) -> None: + v = self._make_viewer() + banner = "\n[bold magenta]══[/bold magenta]\n" + # HEADER must be returned verbatim (pre-formatted Rich markup). + assert v._format_record("HEADER", banner) is banner + + def test_record_visible_respects_level_toggle(self) -> None: + v = self._make_viewer() + assert v._record_visible("INFO", "msg") is True + v._level_enabled["INFO"] = False + assert v._record_visible("INFO", "msg") is False + + def test_critical_filtered_with_error_toggle(self) -> None: + """CRITICAL collapses to ERROR for filter purposes.""" + v = self._make_viewer() + v._level_enabled["ERROR"] = False + assert v._record_visible("CRITICAL", "boom") is False + + def test_header_ignores_level_filter(self) -> None: + v = self._make_viewer() + # Mute every level. + for k in v._level_enabled: + v._level_enabled[k] = False + # HEADER still passes. + assert v._record_visible("HEADER", "banner") is True + + def test_search_query_case_insensitive_substring(self) -> None: + v = self._make_viewer() + v._search_query = "harm" + assert v._record_visible("INFO", "Running HARMBench eval") is True + assert v._record_visible("INFO", "unrelated message") is False + + def test_search_applies_to_headers_too(self) -> None: + v = self._make_viewer() + v._search_query = "step 2" + assert v._record_visible("HEADER", "🎯 STEP 1: Generation") is False + assert v._record_visible("HEADER", "🎯 STEP 2: Evaluation") is True + + def test_filtered_plaintext_strips_markup_from_headers(self) -> None: + v = self._make_viewer() + v._records.append( + ("HEADER", "\n[bold magenta]─\n🎯 STEP 1: Generation\n─[/bold magenta]\n") + ) + v._records.append(("INFO", "info line")) + text = v._filtered_plaintext() + assert "[bold magenta]" not in text + assert "🎯 STEP 1: Generation" in text + assert "[INFO] info line" in text + + def test_filtered_plaintext_respects_search(self) -> None: + v = self._make_viewer() + v._records.extend([("INFO", "alpha"), ("INFO", "beta"), ("INFO", "gamma")]) + v._search_query = "bet" + text = v._filtered_plaintext() + assert "beta" in text + assert "alpha" not in text + assert "gamma" not in text + + def test_save_logs_to_file_returns_none_when_empty(self) -> None: + v = self._make_viewer() + assert v.save_logs_to_file() is None + + def test_save_logs_to_file_writes_filtered_content( + self, tmp_path: Any, monkeypatch: pytest.MonkeyPatch + ) -> None: + import tempfile + + v = self._make_viewer() + v._records.append(("INFO", "kept")) + v._records.append(("DEBUG", "muted")) + v._level_enabled["DEBUG"] = False + + monkeypatch.setattr(tempfile, "gettempdir", lambda: str(tmp_path)) + path = v.save_logs_to_file() + assert path is not None + assert path.startswith(str(tmp_path)) + content = open(path).read() + assert "kept" in content + assert "muted" not in content + + +# ============================================================================ +# AgentActionsViewer bus subscription — exercise the dispatcher directly +# ============================================================================ + + +class _StubViewer: + """Stand-in for :class:`AgentActionsViewer`. + + The real viewer requires a mounted Textual app; we only need to verify + that :meth:`_handle_event` invokes the right ``add_*`` helpers. + """ + + def __init__(self) -> None: + self.calls: List[tuple] = [] + self._action_count = 0 + + def add_step_separator(self, name: str, number: int = 0) -> None: + self.calls.append(("step", name, number)) + + def add_tool_call( + self, tool_name: str, arguments=None, result=None, step_number=None + ) -> None: + self.calls.append(("tool", tool_name, arguments, result, step_number)) + + def add_info_message(self, msg: str) -> None: + self.calls.append(("info", msg)) + + def update_action_count(self, n: int) -> None: + self.calls.append(("count", n)) + + def query_one(self, *_a, **_kw): # pragma: no cover - only used by other paths + return MagicMock() + + +def _dispatch(viewer: _StubViewer, event_type: str, **payload: Any) -> None: + """Invoke the viewer's `_handle_event` method (bound to a real viewer). + + We also bind ``_render_trace`` so the stub can resolve the call without + instantiating a real Textual widget. + """ + from hackagent.cli.tui.widgets.actions import AgentActionsViewer + + # Bind unbound methods to our stub so `self._render_trace(...)` resolves. + viewer._render_trace = AgentActionsViewer._render_trace.__get__(viewer) + AgentActionsViewer._handle_event(viewer, TUIEvent(event_type, payload, 0.0)) + + +class TestAgentActionsViewerDispatch: + """The dispatcher must translate bus events into the right add_* calls.""" + + def test_step_started_adds_separator(self) -> None: + v = _StubViewer() + _dispatch(v, EVENT_STEP_STARTED, step_name="Generation") + assert any(c[0] == "step" and c[1] == "Generation" for c in v.calls) + + def test_goal_started_adds_separator_with_number(self) -> None: + v = _StubViewer() + _dispatch( + v, + EVENT_GOAL_STARTED, + goal_index=2, + goal="Test prompt", + attack_type="advprefix", + ) + step_calls = [c for c in v.calls if c[0] == "step"] + assert len(step_calls) == 1 + name, number = step_calls[0][1], step_calls[0][2] + assert "Goal #3" in name # 0-based index → 1-based label + assert "ADVPREFIX" in name + assert number == 3 + + def test_trace_added_tool_call_maps_to_add_tool_call(self) -> None: + v = _StubViewer() + _dispatch( + v, + "trace_added", + step_type="TOOL_CALL", + step_name="Call", + sequence=5, + content={"name": "grep", "arguments": {"pattern": "foo"}}, + ) + tool_calls = [c for c in v.calls if c[0] == "tool"] + assert tool_calls == [("tool", "grep", {"pattern": "foo"}, None, 5)] + + def test_trace_added_tool_response_maps_to_add_tool_call_with_result(self) -> None: + v = _StubViewer() + _dispatch( + v, + "trace_added", + step_type="TOOL_RESPONSE", + step_name="Result", + sequence=6, + content={"tool": "grep", "result": "matched 3 lines"}, + ) + tool_calls = [c for c in v.calls if c[0] == "tool"] + assert len(tool_calls) == 1 + _, name, args, result, step_number = tool_calls[0] + assert name == "grep" + assert args is None + assert "matched 3 lines" in result + assert step_number == 6 + + +# ============================================================================ +# Tracker / StepTracker event emission +# ============================================================================ + + +class TestTrackerEmitsEvents: + """`Tracker` should publish goal lifecycle events on the bus.""" + + def test_create_goal_result_emits_goal_started_without_backend(self) -> None: + from hackagent.router.tracking.tracker import Tracker + + bus = TUIEventBus() + received: List[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_GOAL_STARTED) + + tracker = Tracker( + backend=None, + run_id=None, + logger=MagicMock(), + attack_type="advprefix", + event_bus=bus, + ) + tracker.create_goal_result(goal="Test goal", goal_index=0) + + assert len(received) == 1 + payload = received[0].payload + assert payload["goal"] == "Test goal" + assert payload["goal_index"] == 0 + assert payload["attack_type"] == "advprefix" + + def test_finalize_goal_emits_goal_finalized(self) -> None: + from hackagent.router.tracking.tracker import Tracker + + bus = TUIEventBus() + received: List[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_GOAL_FINALIZED) + + tracker = Tracker( + backend=None, + run_id=None, + logger=MagicMock(), + attack_type="advprefix", + event_bus=bus, + ) + ctx = tracker.create_goal_result(goal="Test", goal_index=0) + tracker.finalize_goal(ctx, success=True, evaluation_notes="ok") + + assert len(received) == 1 + payload = received[0].payload + assert payload["goal_index"] == 0 + assert payload["success"] is True + assert payload["evaluation_notes"] == "ok" + + def test_tracker_without_bus_does_not_raise(self) -> None: + """Backwards compat: omitting event_bus must keep working.""" + from hackagent.router.tracking.tracker import Tracker + + tracker = Tracker( + backend=None, + run_id=None, + logger=MagicMock(), + attack_type="x", + ) + ctx = tracker.create_goal_result(goal="g", goal_index=0) + tracker.finalize_goal(ctx, success=False) + + +class TestStepTrackerEmitsEvents: + """`StepTracker.track_step` should emit step_started/step_ended.""" + + def test_track_step_emits_lifecycle_when_disabled(self) -> None: + from hackagent.router.tracking.context import TrackingContext + from hackagent.router.tracking.step import StepTracker + + bus = TUIEventBus() + received: List[TUIEvent] = [] + bus.subscribe(received.append) + + ctx = TrackingContext.create_disabled() + ctx.event_bus = bus + tracker = StepTracker(ctx) + + with tracker.track_step("Generate", "STEP_GEN"): + pass + + types = [e.event_type for e in received] + assert types == [EVENT_STEP_STARTED, EVENT_STEP_ENDED] + assert received[0].payload["step_name"] == "Generate" + assert received[1].payload["success"] is True + + def test_track_step_emits_step_ended_on_failure(self) -> None: + from hackagent.router.tracking.context import TrackingContext + from hackagent.router.tracking.step import StepTracker + + bus = TUIEventBus() + received: List[TUIEvent] = [] + bus.subscribe(received.append, event_type=EVENT_STEP_ENDED) + + ctx = TrackingContext.create_disabled() + ctx.event_bus = bus + tracker = StepTracker(ctx) + + with pytest.raises(RuntimeError): + with tracker.track_step("Failing", "STEP_X"): + raise RuntimeError("boom") + + assert len(received) == 1 + assert received[0].payload["success"] is False + assert "boom" in received[0].payload["error"] + + +if __name__ == "__main__": # pragma: no cover + pytest.main([__file__, "-v"])