From 7583dd22eb4ca39cbeee5ca40ab82265fbadbcb0 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 08:56:13 +0000 Subject: [PATCH 1/2] fix: implement ArtifactStore for retrievable tool output overflow (fixes #2148) - Add concrete FileSystemArtifactStore implementing ArtifactStoreProtocol - Wire into tool_execution.py to spill large outputs instead of discarding - Add artifact retrieval tools (head, tail, grep, chunk, load) - Add ToolOutputConfig for configurable limits (max_bytes, retention_days) - Register retrieval tools dynamically on first artifact creation - Add garbage collection for old artifacts This preserves full tool outputs that exceed limits in retrievable artifacts instead of silently truncating and discarding the overflow. --- .../praisonaiagents/agent/agent.py | 54 +++ .../praisonaiagents/agent/tool_execution.py | 92 +++- .../praisonaiagents/config/feature_configs.py | 66 +++ .../praisonaiagents/context/__init__.py | 6 + .../praisonaiagents/context/artifact_store.py | 420 ++++++++++++++++++ .../praisonaiagents/tools/artifact_tools.py | 215 +++++++++ test_artifact_store.py | 88 ++++ 7 files changed, 927 insertions(+), 14 deletions(-) create mode 100644 src/praisonai-agents/praisonaiagents/context/artifact_store.py create mode 100644 src/praisonai-agents/praisonaiagents/tools/artifact_tools.py create mode 100644 test_artifact_store.py diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 0f7ba325d..e812cd5a1 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -592,6 +592,7 @@ def __init__( runtime: Optional[Union[str, Dict[str, Any], 'AgentRuntimeConfig']] = None, # Model-scoped runtime configuration interrupt_controller: Optional['InterruptController'] = None, # G2: Cooperative cancellation tool_search: Optional[Union[bool, str, Dict[str, Any], 'ToolSearchConfig']] = False, # Progressive tool disclosure + tool_output: Optional[Union[bool, Dict[str, Any], 'ToolOutputConfig']] = None, # Tool output handling and artifact storage message_steering: Optional[Union[bool, 'MessageSteeringProtocol']] = False, # Real-time message steering during execution sandbox: Optional[Union[bool, 'SandboxConfig']] = None, # Sandbox for safe code execution ): @@ -1506,6 +1507,51 @@ def __init__( "a dict of ToolSearchConfig fields, or ToolSearchConfig" ) + # Process tool_output config (artifact storage for large outputs) + self._artifact_store = None + self.tool_output_limit = DEFAULT_TOOL_OUTPUT_LIMIT + + if tool_output is None or tool_output is False: + # Disabled - use default truncation only + pass + elif tool_output is True: + # Enabled with defaults + from ..config.feature_configs import ToolOutputConfig + from ..context.artifact_store import FileSystemArtifactStore + config = ToolOutputConfig() + self.tool_output_limit = config.max_bytes + if config.enable_artifacts: + self._artifact_store = config.artifact_store or FileSystemArtifactStore( + retention_days=config.retention_days, + redact_secrets=config.redact_secrets + ) + elif isinstance(tool_output, dict): + # Dict -> config overrides + from ..config.feature_configs import ToolOutputConfig + from ..context.artifact_store import FileSystemArtifactStore + config = ToolOutputConfig(**tool_output) + self.tool_output_limit = config.max_bytes + if config.enable_artifacts: + self._artifact_store = config.artifact_store or FileSystemArtifactStore( + retention_days=config.retention_days, + redact_secrets=config.redact_secrets + ) + else: + from ..config.feature_configs import ToolOutputConfig + if isinstance(tool_output, ToolOutputConfig): + config = tool_output + self.tool_output_limit = config.max_bytes + if config.enable_artifacts: + from ..context.artifact_store import FileSystemArtifactStore + self._artifact_store = config.artifact_store or FileSystemArtifactStore( + retention_days=config.retention_days, + redact_secrets=config.redact_secrets + ) + else: + raise TypeError( + "tool_output must be False/None, True, a dict of ToolOutputConfig fields, or ToolOutputConfig" + ) + # ============================================================ # END CONSOLIDATED PARAMS EXTRACTION # ============================================================ @@ -5515,6 +5561,14 @@ def __del__(self): memory = getattr(self, "_memory_instance", None) if memory and hasattr(memory, 'close_connections'): memory.close_connections() + + # Clean up old artifacts if artifact store is configured + artifact_store = getattr(self, "_artifact_store", None) + if artifact_store and hasattr(artifact_store, 'cleanup_old_artifacts'): + try: + artifact_store.cleanup_old_artifacts() + except Exception: + pass # Best effort cleanup except Exception as exc: # noqa: BLE001 - finalizers must not raise import contextlib with contextlib.suppress(Exception): diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index e8c7a6ae6..f3b6d5dc1 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -26,6 +26,33 @@ class ToolExecutionMixin: """Mixin providing toolexecution methods for the Agent class.""" + + def _register_artifact_tools(self): + """Register artifact retrieval tools when artifacts are first created.""" + try: + from ..tools import artifact_tools + + # Set the store reference for the tools + artifact_tools.set_artifact_store(self._artifact_store) + + # Add the retrieval tools + tools_to_add = [ + artifact_tools.artifact_head, + artifact_tools.artifact_tail, + artifact_tools.artifact_grep, + artifact_tools.artifact_chunk, + ] + + # Only add if not already present + existing_tool_names = {getattr(t, '__name__', str(t)) for t in self.tools} + for tool in tools_to_add: + tool_name = getattr(tool, '__name__', str(tool)) + if tool_name not in existing_tool_names: + self.tools.append(tool) + + logging.debug("Registered artifact retrieval tools") + except Exception as e: + logging.warning(f"Failed to register artifact tools: {e}") def _get_existing_stream_emitter(self): """Return an already-initialized stream emitter without creating one.""" @@ -310,21 +337,58 @@ def execute_with_context(): try: result_str = str(result) - if self.context_manager: - # Use context-aware truncation with configured budget - truncated = self._truncate_tool_output(function_name, result_str) - else: - # Apply default limit even without context management - # This prevents runaway tool outputs from causing overflow - limit = getattr(self, 'tool_output_limit', 16000) - if len(result_str) > limit: - # Use smart truncation format that judge recognizes as OK - tail_size = min(limit // 5, 2000) - head = result_str[:limit - tail_size] - tail = result_str[-tail_size:] if tail_size > 0 else "" - truncated = f"{head}\n...[{len(result_str):,} chars, showing first/last portions]...\n{tail}" + # Get configured limit + limit = getattr(self, 'tool_output_limit', 16000) + + # Check if we need to spill to artifact store + if len(result_str) > limit: + # Try to use artifact store if available + artifact_ref = None + if hasattr(self, '_artifact_store') and self._artifact_store is not None: + try: + from ..context.artifacts import ArtifactMetadata + + # Create metadata for this artifact + metadata = ArtifactMetadata( + agent_id=self.name, + run_id=getattr(self, '_current_run_id', 'unknown'), + tool_name=function_name, + turn_id=getattr(self, '_turn_counter', 0), + ) + + # Store the full output + artifact_ref = self._artifact_store.store(result_str, metadata) + logging.debug(f"Stored {function_name} output ({len(result_str)} bytes) as artifact {artifact_ref.artifact_id}") + + # Register artifact retrieval tools if not already registered + if not hasattr(self, '_artifact_tools_registered'): + self._register_artifact_tools() + self._artifact_tools_registered = True + except Exception as e: + logging.debug(f"Failed to store artifact: {e}") + + # Generate truncated preview + tail_size = min(limit // 5, 2000) + head = result_str[:limit - tail_size] + tail = result_str[-tail_size:] if tail_size > 0 else "" + + # If we stored an artifact, include reference in the output + if artifact_ref: + truncated = ( + f"{head}\n" + f"...[{len(result_str):,} chars total, showing first/last portions]...\n" + f"{tail}\n\n" + f"{artifact_ref.to_inline()}" + ) else: - truncated = result_str + # Fallback to simple truncation + truncated = f"{head}\n...[{len(result_str):,} chars, showing first/last portions]...\n{tail}" + else: + truncated = result_str + + if self.context_manager and hasattr(self, '_truncate_tool_output'): + # Use context-aware truncation if available + truncated = self._truncate_tool_output(function_name, truncated) if len(truncated) < len(result_str): logging.debug(f"Truncated {function_name} output from {len(result_str)} to {len(truncated)} chars") diff --git a/src/praisonai-agents/praisonaiagents/config/feature_configs.py b/src/praisonai-agents/praisonaiagents/config/feature_configs.py index 05040d701..d6f4b3bdf 100644 --- a/src/praisonai-agents/praisonaiagents/config/feature_configs.py +++ b/src/praisonai-agents/praisonaiagents/config/feature_configs.py @@ -42,6 +42,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from ..compaction.strategy import CompactionStrategy + from ..context.artifact_store import FileSystemArtifactStore class MemoryBackend(str, Enum): @@ -1506,6 +1507,68 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: return value +@dataclass +class ToolOutputConfig: + """ + Configuration for tool output handling and artifact storage. + + Controls when and how large tool outputs are stored as artifacts + instead of being truncated and lost. + + Args: + max_bytes: Maximum bytes before spilling to artifact store (default: 16000) + max_lines: Maximum lines before spilling (default: None, bytes-only) + direction: Truncation direction - "head", "tail", or "both" (default: "both") + retention_days: Days to retain artifacts before garbage collection (default: 7) + enable_artifacts: Whether to enable artifact storage (default: True) + artifact_store: Custom artifact store instance (default: FileSystemArtifactStore) + redact_secrets: Whether to redact secrets from artifacts (default: True) + + Example: + agent = Agent( + instructions="...", + tool_output=ToolOutputConfig( + max_bytes=32000, + direction="tail", + retention_days=14, + ) + ) + """ + max_bytes: int = 16000 + max_lines: Optional[int] = None + direction: str = "both" # "head", "tail", or "both" + retention_days: int = 7 + enable_artifacts: bool = True + artifact_store: Optional[Any] = None # FileSystemArtifactStore instance + redact_secrets: bool = True + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "max_bytes": self.max_bytes, + "max_lines": self.max_lines, + "direction": self.direction, + "retention_days": self.retention_days, + "enable_artifacts": self.enable_artifacts, + "redact_secrets": self.redact_secrets, + } + + +# Type alias for tool output parameter +ToolOutputParam = Union[bool, ToolOutputConfig, Any] + + +def resolve_tool_output(value: Optional[ToolOutputParam]) -> Optional[ToolOutputParam]: + """Resolve tool output configuration with precedence ladder.""" + if value is None or value is False: + return None + if value is True: + return ToolOutputConfig() + if isinstance(value, ToolOutputConfig): + return value + return value + + __all__ = [ # Enums "MemoryBackend", @@ -1533,6 +1596,7 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: "SkillsConfig", "AutonomyConfig", "ToolSearchConfig", + "ToolOutputConfig", # Config classes (Multi-Agent) "MultiAgentHooksConfig", "MultiAgentOutputConfig", @@ -1555,6 +1619,7 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: "AutonomyParam", "ToolSearchParam", "ToolParam", + "ToolOutputParam", # Precedence ladder resolvers "resolve_memory", "resolve_knowledge", @@ -1566,4 +1631,5 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: "resolve_autonomy", "resolve_tool_search", "resolve_tools", + "resolve_tool_output", ] diff --git a/src/praisonai-agents/praisonaiagents/context/__init__.py b/src/praisonai-agents/praisonaiagents/context/__init__.py index 86f0d8549..6d6767b15 100644 --- a/src/praisonai-agents/praisonaiagents/context/__init__.py +++ b/src/praisonai-agents/praisonaiagents/context/__init__.py @@ -180,6 +180,7 @@ def format_percent(value: float) -> str: "TerminalLoggerProtocol", "compute_checksum", "generate_summary", + "FileSystemArtifactStore", # Context Compaction Policy "ContextCompactionPolicy", "ContextCompactionPolicyProtocol", @@ -295,6 +296,11 @@ def __getattr__(name: str): from . import artifacts return getattr(artifacts, name) + # FileSystemArtifactStore + if name == "FileSystemArtifactStore": + from .artifact_store import FileSystemArtifactStore + return FileSystemArtifactStore + # Session Context Tracking (Agno pattern) if name in ("SessionContextTracker", "SessionState"): from . import session_tracker diff --git a/src/praisonai-agents/praisonaiagents/context/artifact_store.py b/src/praisonai-agents/praisonaiagents/context/artifact_store.py new file mode 100644 index 000000000..b212c1698 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/context/artifact_store.py @@ -0,0 +1,420 @@ +""" +Concrete implementation of ArtifactStoreProtocol for filesystem storage. + +This module provides a filesystem-based artifact store that persists +tool outputs to disk when they exceed configured size limits. +""" + +import os +import re +import json +import time +import uuid +import hashlib +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +from .artifacts import ( + ArtifactRef, + ArtifactMetadata, + ArtifactStoreProtocol, + GrepMatch, + compute_checksum, + generate_summary, +) + + +class FileSystemArtifactStore: + """ + Filesystem-based artifact storage implementation. + + Stores artifacts under ~/.praisonai/artifacts/{agent_id}/{run_id}/ + with content-addressed naming using SHA256 hashes. + """ + + def __init__( + self, + base_dir: Optional[str] = None, + retention_days: int = 7, + redact_secrets: bool = True, + ): + """ + Initialize the filesystem artifact store. + + Args: + base_dir: Base directory for storage (default: ~/.praisonai/artifacts) + retention_days: Days to retain artifacts before GC + redact_secrets: Whether to redact secrets from stored content + """ + if base_dir: + self.base_dir = Path(base_dir) + else: + # Use standard PraisonAI data directory + home = Path.home() + self.base_dir = home / ".praisonai" / "artifacts" + + self.retention_days = retention_days + self.redact_secrets = redact_secrets + self._secret_patterns = [ + r'(?i)(api[_-]?key|apikey)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'(?i)(secret|password|passwd|pwd)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'(?i)(token|bearer)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'sk-[a-zA-Z0-9]{20,}', # OpenAI keys + r'ghp_[a-zA-Z0-9]{36}', # GitHub tokens + ] + + # Ensure base directory exists + self.base_dir.mkdir(parents=True, exist_ok=True) + + def store( + self, + content: Any, + metadata: ArtifactMetadata, + ) -> ArtifactRef: + """ + Store content as an artifact. + + Args: + content: The content to store (will be serialized) + metadata: Metadata about the artifact + + Returns: + ArtifactRef pointing to the stored artifact + """ + # Serialize content + if isinstance(content, (str, bytes)): + content_bytes = content.encode("utf-8") if isinstance(content, str) else content + mime_type = "text/plain" + else: + # JSON serialize structured data + content_str = json.dumps(content, indent=2, default=str) + content_bytes = content_str.encode("utf-8") + mime_type = "application/json" + + # Redact secrets if enabled + if self.redact_secrets and isinstance(content, str): + for pattern in self._secret_patterns: + content = re.sub(pattern, "[REDACTED]", content) + content_bytes = content.encode("utf-8") + + # Compute checksum + checksum = compute_checksum(content_bytes) + + # Generate artifact ID + artifact_id = f"{checksum[:12]}_{uuid.uuid4().hex[:8]}" + + # Create directory structure + artifact_dir = self.base_dir / metadata.agent_id / metadata.run_id + artifact_dir.mkdir(parents=True, exist_ok=True) + + # Write content to file + artifact_path = artifact_dir / f"{artifact_id}.artifact" + artifact_path.write_bytes(content_bytes) + + # Write metadata + meta_path = artifact_dir / f"{artifact_id}.meta.json" + meta_data = metadata.to_dict() + meta_data["checksum"] = checksum + meta_data["size_bytes"] = len(content_bytes) + meta_data["mime_type"] = mime_type + meta_data["created_at"] = time.time() + meta_path.write_text(json.dumps(meta_data, indent=2)) + + # Generate summary + summary = generate_summary(content, max_chars=200) + + # Create and return reference + ref = ArtifactRef( + path=str(artifact_path), + summary=summary, + size_bytes=len(content_bytes), + mime_type=mime_type, + checksum=checksum, + created_at=meta_data["created_at"], + artifact_id=artifact_id, + agent_id=metadata.agent_id, + run_id=metadata.run_id, + tool_name=metadata.tool_name, + turn_id=metadata.turn_id, + ) + + logging.debug(f"Stored artifact {artifact_id} ({len(content_bytes)} bytes) at {artifact_path}") + + return ref + + def load(self, ref: ArtifactRef) -> Any: + """ + Load full content from an artifact. + + Args: + ref: Reference to the artifact + + Returns: + The deserialized content + """ + path = Path(ref.path) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content_bytes = path.read_bytes() + + # Verify checksum if provided + if ref.checksum: + actual_checksum = compute_checksum(content_bytes) + if actual_checksum != ref.checksum: + raise ValueError(f"Checksum mismatch for artifact {ref.artifact_id}") + + # Deserialize based on mime type + if ref.mime_type == "application/json": + return json.loads(content_bytes.decode("utf-8")) + else: + return content_bytes.decode("utf-8") + + def tail(self, ref: ArtifactRef, lines: int = 50) -> str: + """ + Get the last N lines of an artifact. + + Args: + ref: Reference to the artifact + lines: Number of lines to return + + Returns: + String containing the last N lines + """ + path = Path(ref.path) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + content_lines = content.splitlines() + + if len(content_lines) <= lines: + return content + + return "\n".join(content_lines[-lines:]) + + def head(self, ref: ArtifactRef, lines: int = 50) -> str: + """ + Get the first N lines of an artifact. + + Args: + ref: Reference to the artifact + lines: Number of lines to return + + Returns: + String containing the first N lines + """ + path = Path(ref.path) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + content_lines = content.splitlines() + + if len(content_lines) <= lines: + return content + + return "\n".join(content_lines[:lines]) + + def grep( + self, + ref: ArtifactRef, + pattern: str, + context_lines: int = 2, + max_matches: int = 50, + ) -> List[GrepMatch]: + """ + Search for pattern in artifact content. + + Args: + ref: Reference to the artifact + pattern: Regex pattern to search for + context_lines: Number of context lines before/after match + max_matches: Maximum number of matches to return + + Returns: + List of GrepMatch objects + """ + path = Path(ref.path) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + lines = content.splitlines() + + matches = [] + pattern_re = re.compile(pattern, re.IGNORECASE) + + for i, line in enumerate(lines): + if pattern_re.search(line): + # Get context lines + start_idx = max(0, i - context_lines) + end_idx = min(len(lines), i + context_lines + 1) + + match = GrepMatch( + line_number=i + 1, # 1-indexed + line_content=line, + context_before=lines[start_idx:i], + context_after=lines[i + 1:end_idx], + ) + matches.append(match) + + if len(matches) >= max_matches: + break + + return matches + + def chunk( + self, + ref: ArtifactRef, + start_line: int = 1, + end_line: Optional[int] = None, + ) -> str: + """ + Get a chunk of lines from an artifact. + + Args: + ref: Reference to the artifact + start_line: Starting line number (1-indexed) + end_line: Ending line number (inclusive), None for end of file + + Returns: + String containing the requested lines + """ + path = Path(ref.path) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + lines = content.splitlines() + + # Convert to 0-indexed + start_idx = max(0, start_line - 1) + end_idx = end_line if end_line is None else end_line + + return "\n".join(lines[start_idx:end_idx]) + + def delete(self, ref: ArtifactRef) -> bool: + """ + Delete an artifact. + + Args: + ref: Reference to the artifact + + Returns: + True if deleted successfully + """ + path = Path(ref.path) + meta_path = path.with_suffix(".meta.json") + + deleted = False + if path.exists(): + path.unlink() + deleted = True + + if meta_path.exists(): + meta_path.unlink() + + return deleted + + def list_artifacts( + self, + run_id: Optional[str] = None, + agent_id: Optional[str] = None, + tool_name: Optional[str] = None, + ) -> List[ArtifactRef]: + """ + List artifacts matching filters. + + Args: + run_id: Filter by run ID + agent_id: Filter by agent ID + tool_name: Filter by tool name + + Returns: + List of matching ArtifactRef objects + """ + artifacts = [] + + # Determine search path based on filters + if agent_id and run_id: + search_path = self.base_dir / agent_id / run_id + if search_path.exists(): + search_paths = [search_path] + else: + search_paths = [] + elif agent_id: + agent_path = self.base_dir / agent_id + if agent_path.exists(): + search_paths = [p for p in agent_path.iterdir() if p.is_dir()] + else: + search_paths = [] + else: + # Search all + search_paths = [] + for agent_dir in self.base_dir.iterdir(): + if agent_dir.is_dir(): + for run_dir in agent_dir.iterdir(): + if run_dir.is_dir(): + search_paths.append(run_dir) + + # Search for artifacts + for dir_path in search_paths: + for meta_file in dir_path.glob("*.meta.json"): + try: + meta_data = json.loads(meta_file.read_text()) + + # Apply tool_name filter if specified + if tool_name and meta_data.get("tool_name") != tool_name: + continue + + # Reconstruct artifact path + artifact_file = meta_file.with_suffix("").with_suffix(".artifact") + if not artifact_file.exists(): + continue + + # Create ArtifactRef + ref = ArtifactRef( + path=str(artifact_file), + summary="", # Not stored in meta, would need to regenerate + size_bytes=meta_data.get("size_bytes", 0), + mime_type=meta_data.get("mime_type", "application/octet-stream"), + checksum=meta_data.get("checksum", ""), + created_at=meta_data.get("created_at", 0), + artifact_id=artifact_file.stem, + agent_id=meta_data.get("agent_id", ""), + run_id=meta_data.get("run_id", ""), + tool_name=meta_data.get("tool_name"), + turn_id=meta_data.get("turn_id", 0), + ) + artifacts.append(ref) + except Exception as e: + logging.debug(f"Error loading artifact metadata from {meta_file}: {e}") + + # Sort by creation time (newest first) + artifacts.sort(key=lambda x: x.created_at, reverse=True) + + return artifacts + + def cleanup_old_artifacts(self, days: Optional[int] = None) -> int: + """ + Clean up artifacts older than retention period. + + Args: + days: Days to retain (overrides default retention_days) + + Returns: + Number of artifacts deleted + """ + days = days or self.retention_days + cutoff_time = time.time() - (days * 24 * 60 * 60) + deleted_count = 0 + + for artifact in self.list_artifacts(): + if artifact.created_at < cutoff_time: + if self.delete(artifact): + deleted_count += 1 + logging.debug(f"Deleted old artifact {artifact.artifact_id}") + + return deleted_count \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py new file mode 100644 index 000000000..26fc24746 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py @@ -0,0 +1,215 @@ +""" +Artifact retrieval tools for accessing spilled tool outputs. + +These tools are registered lazily when an artifact overflow occurs, +providing agents with the ability to page through preserved outputs. +""" + +import logging +from typing import Optional, List, Dict, Any + +from ..context.artifacts import ArtifactRef, GrepMatch +from .decorator import tool + + +# Module-level artifact store reference +_artifact_store = None + + +def set_artifact_store(store): + """Set the global artifact store for retrieval tools.""" + global _artifact_store + _artifact_store = store + + +@tool("artifact_head") +def artifact_head( + artifact_path: str, + lines: int = 50 +) -> str: + """ + Get the first N lines of a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + lines: Number of lines to return (default: 50) + + Returns: + First N lines of the artifact content + """ + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.head(ref, lines=lines) + except Exception as e: + logging.error(f"Failed to get artifact head: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_tail") +def artifact_tail( + artifact_path: str, + lines: int = 50 +) -> str: + """ + Get the last N lines of a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + lines: Number of lines to return (default: 50) + + Returns: + Last N lines of the artifact content + """ + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.tail(ref, lines=lines) + except Exception as e: + logging.error(f"Failed to get artifact tail: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_grep") +def artifact_grep( + artifact_path: str, + pattern: str, + context_lines: int = 2, + max_matches: int = 50 +) -> List[Dict[str, Any]]: + """ + Search for a pattern in a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + pattern: Regex pattern to search for + context_lines: Number of context lines before/after match (default: 2) + max_matches: Maximum number of matches to return (default: 50) + + Returns: + List of matches with line numbers and context + """ + if _artifact_store is None: + return [{"error": "Artifact store not available"}] + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + matches = _artifact_store.grep(ref, pattern, context_lines=context_lines, max_matches=max_matches) + + # Convert to serializable format + result = [] + for match in matches: + result.append({ + "line_number": match.line_number, + "line": match.line_content, + "context_before": match.context_before, + "context_after": match.context_after, + }) + + return result + except Exception as e: + logging.error(f"Failed to grep artifact: {e}") + return [{"error": f"Error searching artifact: {str(e)}"}] + + +@tool("artifact_chunk") +def artifact_chunk( + artifact_path: str, + start_line: int = 1, + end_line: Optional[int] = None +) -> str: + """ + Get a chunk of lines from a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + start_line: Starting line number (1-indexed) + end_line: Ending line number (inclusive), None for end of file + + Returns: + Lines from start_line to end_line + """ + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.chunk(ref, start_line=start_line, end_line=end_line) + except Exception as e: + logging.error(f"Failed to get artifact chunk: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_load") +def artifact_load( + artifact_path: str +) -> Any: + """ + Load the full content of a stored artifact. + + WARNING: This loads the entire artifact into memory. + For large artifacts, consider using head/tail/grep/chunk instead. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + + Returns: + Full artifact content (deserialized if JSON) + """ + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.load(ref) + except Exception as e: + logging.error(f"Failed to load artifact: {e}") + return f"Error loading artifact: {str(e)}" + + +@tool("artifact_list") +def artifact_list( + agent_id: Optional[str] = None, + run_id: Optional[str] = None, + tool_name: Optional[str] = None +) -> List[Dict[str, Any]]: + """ + List available artifacts matching the given filters. + + Args: + agent_id: Filter by agent ID + run_id: Filter by run ID + tool_name: Filter by tool that created the artifact + + Returns: + List of artifact metadata dictionaries + """ + if _artifact_store is None: + return [{"error": "Artifact store not available"}] + + try: + refs = _artifact_store.list_artifacts( + agent_id=agent_id, + run_id=run_id, + tool_name=tool_name + ) + + result = [] + for ref in refs: + result.append({ + "path": ref.path, + "artifact_id": ref.artifact_id, + "size_bytes": ref.size_bytes, + "created_at": ref.created_at, + "tool_name": ref.tool_name, + "summary": ref.summary, + }) + + return result + except Exception as e: + logging.error(f"Failed to list artifacts: {e}") + return [{"error": f"Error listing artifacts: {str(e)}"}] \ No newline at end of file diff --git a/test_artifact_store.py b/test_artifact_store.py new file mode 100644 index 000000000..533be22c5 --- /dev/null +++ b/test_artifact_store.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +"""Test artifact storage functionality.""" + +import sys +import os +sys.path.insert(0, 'src/praisonai-agents') + +from praisonaiagents import Agent +from praisonaiagents.config.feature_configs import ToolOutputConfig +from praisonaiagents.context import FileSystemArtifactStore, ArtifactMetadata +from praisonaiagents.tools import tool +import tempfile + +# Create a test tool that generates large output +@tool +def generate_large_output(size: int = 20000) -> str: + """Generate a large string for testing artifact storage.""" + return "X" * size + +def test_artifact_storage(): + """Test that large tool outputs are stored as artifacts.""" + print("Testing artifact storage...") + + # Create agent with artifact storage enabled + agent = Agent( + name="TestAgent", + instructions="You are a test agent", + tools=[generate_large_output], + tool_output=ToolOutputConfig( + max_bytes=1000, # Low limit to trigger artifact storage + enable_artifacts=True, + retention_days=1 + ) + ) + + # Verify artifact store was initialized + assert agent._artifact_store is not None + print("✓ Artifact store initialized") + + # Test storing an artifact directly + store = FileSystemArtifactStore() + metadata = ArtifactMetadata( + agent_id="test_agent", + run_id="test_run", + tool_name="test_tool", + turn_id=1 + ) + + large_content = "TEST" * 5000 # 20KB content + ref = store.store(large_content, metadata) + + assert ref is not None + assert ref.size_bytes == len(large_content) + print(f"✓ Stored artifact: {ref.artifact_id}") + + # Test retrieval methods + head = store.head(ref, lines=2) + assert "TEST" in head + print("✓ Head retrieval works") + + tail = store.tail(ref, lines=2) + assert "TEST" in tail + print("✓ Tail retrieval works") + + # Test grep + matches = store.grep(ref, "TEST", context_lines=1, max_matches=5) + assert len(matches) > 0 + print(f"✓ Grep found {len(matches)} matches") + + # Test chunk + chunk = store.chunk(ref, start_line=1, end_line=3) + assert "TEST" in chunk + print("✓ Chunk retrieval works") + + # Test full load + loaded = store.load(ref) + assert loaded == large_content + print("✓ Full load works") + + # Test cleanup + deleted = store.delete(ref) + assert deleted + print("✓ Artifact deleted") + + print("\n✅ All artifact storage tests passed!") + +if __name__ == "__main__": + test_artifact_storage() \ No newline at end of file From 8556619a27923c5441ec903814485300ebe78488 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:17:57 +0000 Subject: [PATCH 2/2] fix: address critical security and architectural issues in artifact store - Add path traversal protection by validating artifact paths are within store directory - Replace global artifact store with thread-local storage for multi-agent support - Preserve artifact references through context manager truncation - Add missing artifact_load and artifact_list tools to registration - Fix tool_output_limit being overwritten by tracking custom config - Propagate tool_output config in clone_for_channel for cloned agents - Add proper error logging for artifact cleanup failures - Add validation for ToolOutputConfig parameters - Fix run_id filter in list_artifacts when agent_id not provided - Fix cleanup_old_artifacts to handle days=0 correctly - Fix secret redaction to cover JSON and list content - Fix artifact_load to use correct mime_type from metadata Co-authored-by: Mervin Praison --- .../praisonaiagents/agent/agent.py | 22 +++++-- .../praisonaiagents/agent/tool_execution.py | 19 +++++- .../praisonaiagents/config/feature_configs.py | 11 ++++ .../praisonaiagents/context/artifact_store.py | 53 ++++++++++----- .../praisonaiagents/tools/artifact_tools.py | 64 +++++++++++++++++-- 5 files changed, 137 insertions(+), 32 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index e812cd5a1..bbb4c02ae 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -1509,7 +1509,9 @@ def __init__( # Process tool_output config (artifact storage for large outputs) self._artifact_store = None - self.tool_output_limit = DEFAULT_TOOL_OUTPUT_LIMIT + self._tool_output_config = tool_output # Store the original config for cloning + # Track if tool_output_limit was customized via tool_output config + _custom_tool_output_limit = None if tool_output is None or tool_output is False: # Disabled - use default truncation only @@ -1519,7 +1521,7 @@ def __init__( from ..config.feature_configs import ToolOutputConfig from ..context.artifact_store import FileSystemArtifactStore config = ToolOutputConfig() - self.tool_output_limit = config.max_bytes + _custom_tool_output_limit = config.max_bytes if config.enable_artifacts: self._artifact_store = config.artifact_store or FileSystemArtifactStore( retention_days=config.retention_days, @@ -1530,7 +1532,7 @@ def __init__( from ..config.feature_configs import ToolOutputConfig from ..context.artifact_store import FileSystemArtifactStore config = ToolOutputConfig(**tool_output) - self.tool_output_limit = config.max_bytes + _custom_tool_output_limit = config.max_bytes if config.enable_artifacts: self._artifact_store = config.artifact_store or FileSystemArtifactStore( retention_days=config.retention_days, @@ -1540,7 +1542,7 @@ def __init__( from ..config.feature_configs import ToolOutputConfig if isinstance(tool_output, ToolOutputConfig): config = tool_output - self.tool_output_limit = config.max_bytes + _custom_tool_output_limit = config.max_bytes if config.enable_artifacts: from ..context.artifact_store import FileSystemArtifactStore self._artifact_store = config.artifact_store or FileSystemArtifactStore( @@ -1790,7 +1792,8 @@ def __init__( self._init_message_steering(message_steering) self.verbose = verbose self._has_explicit_output_config = _has_explicit_output # Track if user set output mode - self.tool_output_limit = tool_output_limit # Configurable tool output limit + # Use custom tool_output_limit if set via tool_output config, otherwise use parameter value + self.tool_output_limit = _custom_tool_output_limit if _custom_tool_output_limit is not None else tool_output_limit self.allow_delegation = allow_delegation self.step_callback = step_callback # Token budget guard (zero overhead when _max_budget is None) @@ -2197,6 +2200,7 @@ def clone_for_channel(self) -> "Agent": 'approval': getattr(self, '_approval_config', None), 'learn': getattr(self, '_learn_config', None), 'tool_search': getattr(self, '_tool_search_config', None), + 'tool_output': getattr(self, '_tool_output_config', None), # Tool configuration - use consolidated config when available 'tool_config': getattr(self, '_tool_config', None), @@ -5567,8 +5571,12 @@ def __del__(self): if artifact_store and hasattr(artifact_store, 'cleanup_old_artifacts'): try: artifact_store.cleanup_old_artifacts() - except Exception: - pass # Best effort cleanup + except Exception as e: + # Log the error for debugging but don't fail cleanup + import logging + logging.debug( + f"Failed to cleanup artifacts for agent {self.name}: {e}" + ) except Exception as exc: # noqa: BLE001 - finalizers must not raise import contextlib with contextlib.suppress(Exception): diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index f3b6d5dc1..423ad5c57 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -41,6 +41,8 @@ def _register_artifact_tools(self): artifact_tools.artifact_tail, artifact_tools.artifact_grep, artifact_tools.artifact_chunk, + artifact_tools.artifact_load, + artifact_tools.artifact_list, ] # Only add if not already present @@ -387,8 +389,18 @@ def execute_with_context(): truncated = result_str if self.context_manager and hasattr(self, '_truncate_tool_output'): - # Use context-aware truncation if available - truncated = self._truncate_tool_output(function_name, truncated) + # Use context-aware truncation if available, but preserve artifact reference + if artifact_ref: + # Extract the artifact reference from the truncated string + artifact_inline = artifact_ref.to_inline() + # Remove the artifact reference before context truncation + truncated_without_ref = truncated.replace(artifact_inline, "").rstrip() + # Apply context truncation + truncated_without_ref = self._truncate_tool_output(function_name, truncated_without_ref) + # Re-append the artifact reference + truncated = f"{truncated_without_ref}\n\n{artifact_inline}" + else: + truncated = self._truncate_tool_output(function_name, truncated) if len(truncated) < len(result_str): logging.debug(f"Truncated {function_name} output from {len(result_str)} to {len(truncated)} chars") @@ -396,6 +408,9 @@ def execute_with_context(): if isinstance(result, dict): max_field_chars = getattr(self, 'tool_output_limit', 16000) if not self.context_manager else None result = self._truncate_dict_fields(result, function_name, max_field_chars) + # Add artifact reference to dict result if available + if artifact_ref: + result["_artifact_ref"] = artifact_ref.to_dict() else: result = truncated except Exception as e: diff --git a/src/praisonai-agents/praisonaiagents/config/feature_configs.py b/src/praisonai-agents/praisonaiagents/config/feature_configs.py index d6f4b3bdf..baf64e301 100644 --- a/src/praisonai-agents/praisonaiagents/config/feature_configs.py +++ b/src/praisonai-agents/praisonaiagents/config/feature_configs.py @@ -1542,6 +1542,17 @@ class ToolOutputConfig: artifact_store: Optional[Any] = None # FileSystemArtifactStore instance redact_secrets: bool = True + def __post_init__(self) -> None: + """Validate configuration values.""" + if self.max_bytes <= 0: + raise ValueError("max_bytes must be > 0. Use False/None to disable artifact spilling.") + if self.max_lines is not None and self.max_lines <= 0: + raise ValueError("max_lines must be > 0 when provided.") + if self.direction not in {"head", "tail", "both"}: + raise ValueError("direction must be one of: 'head', 'tail', 'both'.") + if self.retention_days < 0: + raise ValueError("retention_days must be >= 0.") + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { diff --git a/src/praisonai-agents/praisonaiagents/context/artifact_store.py b/src/praisonai-agents/praisonaiagents/context/artifact_store.py index b212c1698..a8e2d587c 100644 --- a/src/praisonai-agents/praisonaiagents/context/artifact_store.py +++ b/src/praisonai-agents/praisonaiagents/context/artifact_store.py @@ -82,8 +82,9 @@ def store( Returns: ArtifactRef pointing to the stored artifact """ - # Serialize content + # Serialize content and prepare for redaction if isinstance(content, (str, bytes)): + content_str = content if isinstance(content, str) else None content_bytes = content.encode("utf-8") if isinstance(content, str) else content mime_type = "text/plain" else: @@ -92,11 +93,11 @@ def store( content_bytes = content_str.encode("utf-8") mime_type = "application/json" - # Redact secrets if enabled - if self.redact_secrets and isinstance(content, str): + # Redact secrets if enabled (for text and JSON content) + if self.redact_secrets and content_str: for pattern in self._secret_patterns: - content = re.sub(pattern, "[REDACTED]", content) - content_bytes = content.encode("utf-8") + content_str = re.sub(pattern, "[REDACTED]", content_str) + content_bytes = content_str.encode("utf-8") # Compute checksum checksum = compute_checksum(content_bytes) @@ -121,8 +122,9 @@ def store( meta_data["created_at"] = time.time() meta_path.write_text(json.dumps(meta_data, indent=2)) - # Generate summary - summary = generate_summary(content, max_chars=200) + # Generate summary (use redacted content if available) + summary_source = content_str if content_str else str(content) + summary = generate_summary(summary_source, max_chars=200) # Create and return reference ref = ArtifactRef( @@ -143,6 +145,23 @@ def store( return ref + def _validate_artifact_path(self, path: Path) -> Path: + """Validate that path is within the artifact store's base directory.""" + resolved = path.expanduser().resolve() + base_resolved = self.base_dir.expanduser().resolve() + + # Check if path is within base directory + try: + resolved.relative_to(base_resolved) + except ValueError: + raise PermissionError(f"Artifact path is outside configured store: {path}") + + # Validate extension + if resolved.suffix != ".artifact": + raise ValueError(f"Invalid artifact file extension: {path}") + + return resolved + def load(self, ref: ArtifactRef) -> Any: """ Load full content from an artifact. @@ -153,7 +172,7 @@ def load(self, ref: ArtifactRef) -> Any: Returns: The deserialized content """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) if not path.exists(): raise FileNotFoundError(f"Artifact not found: {ref.path}") @@ -182,7 +201,7 @@ def tail(self, ref: ArtifactRef, lines: int = 50) -> str: Returns: String containing the last N lines """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) if not path.exists(): raise FileNotFoundError(f"Artifact not found: {ref.path}") @@ -205,7 +224,7 @@ def head(self, ref: ArtifactRef, lines: int = 50) -> str: Returns: String containing the first N lines """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) if not path.exists(): raise FileNotFoundError(f"Artifact not found: {ref.path}") @@ -236,7 +255,7 @@ def grep( Returns: List of GrepMatch objects """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) if not path.exists(): raise FileNotFoundError(f"Artifact not found: {ref.path}") @@ -282,7 +301,7 @@ def chunk( Returns: String containing the requested lines """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) if not path.exists(): raise FileNotFoundError(f"Artifact not found: {ref.path}") @@ -305,7 +324,7 @@ def delete(self, ref: ArtifactRef) -> bool: Returns: True if deleted successfully """ - path = Path(ref.path) + path = self._validate_artifact_path(Path(ref.path)) meta_path = path.with_suffix(".meta.json") deleted = False @@ -351,13 +370,15 @@ def list_artifacts( else: search_paths = [] else: - # Search all + # Search all - but respect run_id filter if provided search_paths = [] for agent_dir in self.base_dir.iterdir(): if agent_dir.is_dir(): for run_dir in agent_dir.iterdir(): if run_dir.is_dir(): - search_paths.append(run_dir) + # If run_id filter provided, only include matching runs + if run_id is None or run_dir.name == run_id: + search_paths.append(run_dir) # Search for artifacts for dir_path in search_paths: @@ -407,7 +428,7 @@ def cleanup_old_artifacts(self, days: Optional[int] = None) -> int: Returns: Number of artifacts deleted """ - days = days or self.retention_days + days = self.retention_days if days is None else days cutoff_time = time.time() - (days * 24 * 60 * 60) deleted_count = 0 diff --git a/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py index 26fc24746..4c3e7a2f3 100644 --- a/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py +++ b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py @@ -6,20 +6,41 @@ """ import logging +import threading from typing import Optional, List, Dict, Any from ..context.artifacts import ArtifactRef, GrepMatch from .decorator import tool -# Module-level artifact store reference -_artifact_store = None +# Thread-local storage for per-agent artifact stores +_artifact_stores = threading.local() -def set_artifact_store(store): - """Set the global artifact store for retrieval tools.""" - global _artifact_store - _artifact_store = store +def set_artifact_store(store, agent_id: Optional[str] = None): + """Set the artifact store for retrieval tools. + + Args: + store: The artifact store instance + agent_id: Optional agent identifier for multi-agent scenarios + """ + if not hasattr(_artifact_stores, 'stores'): + _artifact_stores.stores = {} + + # Use agent_id as key, or 'default' if not provided + key = agent_id or 'default' + _artifact_stores.stores[key] = store + + +def _get_artifact_store(agent_id: Optional[str] = None): + """Get the artifact store for the current context.""" + if not hasattr(_artifact_stores, 'stores'): + return None + + # Try agent-specific store first, then fall back to default + key = agent_id or 'default' + stores = _artifact_stores.stores + return stores.get(key) or stores.get('default') @tool("artifact_head") @@ -37,6 +58,7 @@ def artifact_head( Returns: First N lines of the artifact content """ + _artifact_store = _get_artifact_store() if _artifact_store is None: return "Error: Artifact store not available" @@ -63,6 +85,7 @@ def artifact_tail( Returns: Last N lines of the artifact content """ + _artifact_store = _get_artifact_store() if _artifact_store is None: return "Error: Artifact store not available" @@ -133,6 +156,7 @@ def artifact_chunk( Returns: Lines from start_line to end_line """ + _artifact_store = _get_artifact_store() if _artifact_store is None: return "Error: Artifact store not available" @@ -160,11 +184,37 @@ def artifact_load( Returns: Full artifact content (deserialized if JSON) """ + _artifact_store = _get_artifact_store() if _artifact_store is None: return "Error: Artifact store not available" try: - ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + # Try to load metadata to get correct mime_type and checksum + import json + from pathlib import Path + + meta_path = artifact_path.replace(".artifact", ".meta.json") + meta = {} + try: + p = Path(meta_path) + if p.exists(): + meta = json.loads(p.read_text()) + except Exception: + pass + + # Create ref with actual metadata + ref = ArtifactRef( + path=artifact_path, + summary=meta.get("summary", ""), + size_bytes=meta.get("size_bytes", 0), + mime_type=meta.get("mime_type", "text/plain"), + checksum=meta.get("checksum", ""), + artifact_id=meta.get("artifact_id", ""), + agent_id=meta.get("agent_id", ""), + run_id=meta.get("run_id", ""), + tool_name=meta.get("tool_name"), + turn_id=meta.get("turn_id", 0) + ) return _artifact_store.load(ref) except Exception as e: logging.error(f"Failed to load artifact: {e}")