diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index f7a8a6ce0..563c5e78f 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -1508,6 +1508,9 @@ def __init__( "a dict of ToolSearchConfig fields, or ToolSearchConfig" ) + # Process tool_config and artifact storage (moved from tool_output) + self._artifact_store = None + # ============================================================ # END CONSOLIDATED PARAMS EXTRACTION # ============================================================ @@ -1556,6 +1559,14 @@ def __init__( tool_config=tool_config ) + # Set up artifact store if enabled in tool_config + if _tool_config and _tool_config.enable_artifacts: + from ..context.artifact_store import FileSystemArtifactStore + self._artifact_store = _tool_config.artifact_store or FileSystemArtifactStore( + retention_days=_tool_config.artifact_retention_days, + redact_secrets=_tool_config.redact_secrets + ) + # Gap 2: Store parallel tool calls setting for ToolCallExecutor selection self.parallel_tool_calls = _tool_config.parallel if _tool_config else parallel_tool_calls # G2: Store interrupt controller for cooperative cancellation @@ -1779,7 +1790,8 @@ def __init__( self._init_message_steering(message_steering) self.verbose = verbose self._has_explicit_output_config = _has_explicit_output # Track if user set output mode - self.tool_output_limit = tool_output_limit # Configurable tool output limit + # Use tool_config output_limit if configured, otherwise use parameter value + self.tool_output_limit = _tool_config.output_limit if _tool_config else tool_output_limit self.allow_delegation = allow_delegation self.step_callback = step_callback # Token budget guard (zero overhead when _max_budget is None) @@ -2197,7 +2209,6 @@ def clone_for_channel(self) -> "Agent": 'approval': getattr(self, '_approval_config', None), 'learn': getattr(self, '_learn_config', None), 'tool_search': getattr(self, '_tool_search_config', None), - # Tool configuration - use consolidated config when available 'tool_config': getattr(self, '_tool_config', None), @@ -5593,6 +5604,18 @@ def __del__(self): memory = getattr(self, "_memory_instance", None) if memory and hasattr(memory, 'close_connections'): memory.close_connections() + + # Clean up old artifacts if artifact store is configured + artifact_store = getattr(self, "_artifact_store", None) + if artifact_store and hasattr(artifact_store, 'cleanup_old_artifacts'): + try: + artifact_store.cleanup_old_artifacts() + except Exception as e: + # Log the error for debugging but don't fail cleanup + import logging + logging.debug( + f"Failed to cleanup artifacts for agent {self.name}: {e}" + ) except Exception as exc: # noqa: BLE001 - finalizers must not raise import contextlib with contextlib.suppress(Exception): diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index 0f2d2d149..ea88df335 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -51,6 +51,35 @@ def delay(attempt: int, initial_delay: float, backoff_factor: float, jitter: flo class ToolExecutionMixin: """Mixin providing toolexecution methods for the Agent class.""" + + def _register_artifact_tools(self): + """Register artifact retrieval tools when artifacts are first created.""" + try: + from ..tools import artifact_tools + + # Set the store reference for the tools + artifact_tools.set_artifact_store(self._artifact_store) + + # Add the retrieval tools + tools_to_add = [ + artifact_tools.artifact_head, + artifact_tools.artifact_tail, + artifact_tools.artifact_grep, + artifact_tools.artifact_chunk, + artifact_tools.artifact_load, + artifact_tools.artifact_list, + ] + + # Only add if not already present + existing_tool_names = {getattr(t, '__name__', str(t)) for t in self.tools} + for tool in tools_to_add: + tool_name = getattr(tool, '__name__', str(tool)) + if tool_name not in existing_tool_names: + self.tools.append(tool) + + logging.debug("Registered artifact retrieval tools") + except Exception as e: + logging.warning(f"Failed to register artifact tools: {e}") def _get_existing_stream_emitter(self): """Return an already-initialized stream emitter without creating one.""" @@ -462,36 +491,68 @@ def execute_with_context(): try: result_str = str(result) - if self.context_manager: - # Use context-aware truncation with configured budget - truncated = self._truncate_tool_output(function_name, result_str, tool_call_id) - else: - # Apply default limit even without context management - # This prevents runaway tool outputs from causing overflow - limit = getattr(self, 'tool_output_limit', 16000) - if len(result_str) > limit: - # Use smart truncation format that judge recognizes as OK - tail_size = min(limit // 5, 2000) - head = result_str[:limit - tail_size] - tail = result_str[-tail_size:] if tail_size > 0 else "" - truncated = f"{head}\n...[{len(result_str):,} chars, showing first/last portions]...\n{tail}" - - # Store full output for later retrieval + # Get configured limit + limit = getattr(self, 'tool_output_limit', 16000) + + # Check if we need to spill to artifact store + if len(result_str) > limit: + # Try to use artifact store if available + artifact_ref = None + if hasattr(self, '_artifact_store') and self._artifact_store is not None: try: - from ..runtime.tool_output_store import get_tool_output_store - store = get_tool_output_store(getattr(self, '_run_id', None)) - metadata = store.store(function_name, result_str, call_id=tool_call_id) - if metadata: - # Add reference to stored output in the truncated preview - truncated = store.format_reference(metadata, truncated) - logging.debug(f"Stored full {function_name} output ({len(result_str)} bytes) at {metadata.get('path')}") - except ImportError: - # Fallback if store not available - pass + from ..context.artifacts import ArtifactMetadata + + # Create metadata for this artifact + metadata = ArtifactMetadata( + agent_id=self.name, + run_id=getattr(self, '_current_run_id', 'unknown'), + tool_name=function_name, + turn_id=getattr(self, '_turn_counter', 0), + ) + + # Store the full output + artifact_ref = self._artifact_store.store(result_str, metadata) + logging.debug(f"Stored {function_name} output ({len(result_str)} bytes) as artifact {artifact_ref.artifact_id}") + + # Register artifact retrieval tools if not already registered + if not hasattr(self, '_artifact_tools_registered'): + self._register_artifact_tools() + self._artifact_tools_registered = True except Exception as e: - logging.debug(f"Failed to store tool output: {e}") + logging.debug(f"Failed to store artifact: {e}") + + # Generate truncated preview + tail_size = min(limit // 5, 2000) + head = result_str[:limit - tail_size] + tail = result_str[-tail_size:] if tail_size > 0 else "" + + # If we stored an artifact, include reference in the output + if artifact_ref: + truncated = ( + f"{head}\n" + f"...[{len(result_str):,} chars total, showing first/last portions]...\n" + f"{tail}\n\n" + f"{artifact_ref.to_inline()}" + ) + else: + # Fallback to simple truncation + truncated = f"{head}\n...[{len(result_str):,} chars, showing first/last portions]...\n{tail}" + else: + truncated = result_str + + if self.context_manager and hasattr(self, '_truncate_tool_output'): + # Use context-aware truncation if available, but preserve artifact reference + if 'artifact_ref' in locals() and artifact_ref: + # Extract the artifact reference from the truncated string + artifact_inline = artifact_ref.to_inline() + # Remove the artifact reference before context truncation + truncated_without_ref = truncated.replace(artifact_inline, "").rstrip() + # Apply context truncation + truncated_without_ref = self._truncate_tool_output(function_name, truncated_without_ref, tool_call_id) + # Re-append the artifact reference + truncated = f"{truncated_without_ref}\n\n{artifact_inline}" else: - truncated = result_str + truncated = self._truncate_tool_output(function_name, truncated, tool_call_id) if len(truncated) < len(result_str): logging.debug(f"Truncated {function_name} output from {len(result_str)} to {len(truncated)} chars") @@ -499,6 +560,9 @@ def execute_with_context(): if isinstance(result, dict): max_field_chars = getattr(self, 'tool_output_limit', 16000) if not self.context_manager else None result = self._truncate_dict_fields(result, function_name, max_field_chars, tool_call_id) + # Add artifact reference to dict result if available + if 'artifact_ref' in locals() and artifact_ref: + result["_artifact_ref"] = artifact_ref.to_dict() else: result = truncated except Exception as e: diff --git a/src/praisonai-agents/praisonaiagents/config/feature_configs.py b/src/praisonai-agents/praisonaiagents/config/feature_configs.py index c15abd659..e28ca1648 100644 --- a/src/praisonai-agents/praisonaiagents/config/feature_configs.py +++ b/src/praisonai-agents/praisonaiagents/config/feature_configs.py @@ -42,6 +42,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from ..compaction.strategy import CompactionStrategy + from ..context.artifact_store import FileSystemArtifactStore class MemoryBackend(str, Enum): @@ -935,7 +936,8 @@ class ToolConfig: """ Configuration for tool execution behavior. - Configuration for tool execution behavior including timeout, retry policy, and parallel execution. + Configuration for tool execution behavior including timeout, retry policy, parallel execution, + and artifact storage for large outputs. Usage: # Simple enable with defaults @@ -948,8 +950,12 @@ class ToolConfig: parallel=True, )) - # With timeout only - Agent(tool_config=ToolConfig(timeout=30)) + # With artifact storage for large outputs + Agent(tool_config=ToolConfig( + output_limit=32000, + enable_artifacts=True, + artifact_retention_days=14, + )) """ # Tool execution timeout in seconds timeout: Optional[int] = None @@ -960,6 +966,26 @@ class ToolConfig: # Enable parallel execution of batched LLM tool calls parallel: bool = False + # Tool output handling and artifact storage + output_limit: int = 16000 # Maximum bytes before spilling to artifact store + output_max_lines: Optional[int] = None # Maximum lines before spilling + output_direction: str = "both" # Truncation direction: "head", "tail", or "both" + enable_artifacts: bool = False # Whether to enable artifact storage (default False for backward compat) + artifact_retention_days: int = 7 # Days to retain artifacts before garbage collection + artifact_store: Optional[Any] = None # Custom artifact store instance + redact_secrets: bool = True # Whether to redact secrets from artifacts + + def __post_init__(self) -> None: + """Validate configuration after initialization.""" + if self.output_limit <= 0: + raise ValueError("tool_config.output_limit must be > 0") + if self.output_max_lines is not None and self.output_max_lines <= 0: + raise ValueError("tool_config.output_max_lines must be > 0 when provided") + if self.output_direction not in {"head", "tail", "both"}: + raise ValueError("tool_config.output_direction must be one of: 'head', 'tail', 'both'") + if self.artifact_retention_days < 0: + raise ValueError("tool_config.artifact_retention_days must be >= 0") + def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { @@ -970,6 +996,13 @@ def to_dict(self) -> Dict[str, Any]: else self.retry_policy ), "parallel": self.parallel, + "output_limit": self.output_limit, + "output_max_lines": self.output_max_lines, + "output_direction": self.output_direction, + "enable_artifacts": self.enable_artifacts, + "artifact_retention_days": self.artifact_retention_days, + "artifact_store": self.artifact_store, + "redact_secrets": self.redact_secrets, } @classmethod @@ -988,6 +1021,13 @@ def from_dict(cls, data: Dict[str, Any]) -> "ToolConfig": timeout=data.get("timeout"), retry_policy=retry_policy, parallel=data.get("parallel", False), + output_limit=data.get("output_limit", 16000), + output_max_lines=data.get("output_max_lines"), + output_direction=data.get("output_direction", "both"), + enable_artifacts=data.get("enable_artifacts", False), + artifact_retention_days=data.get("artifact_retention_days", 7), + artifact_store=data.get("artifact_store"), + redact_secrets=data.get("redact_secrets", True), ) @@ -1513,6 +1553,8 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: return value + + __all__ = [ # Enums "MemoryBackend", @@ -1562,6 +1604,7 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: "AutonomyParam", "ToolSearchParam", "ToolParam", + "ToolOutputParam", # Precedence ladder resolvers "resolve_memory", "resolve_knowledge", @@ -1573,4 +1616,5 @@ def resolve_tools(value: ToolParam) -> Optional[ToolConfig]: "resolve_autonomy", "resolve_tool_search", "resolve_tools", + "resolve_tool_output", ] diff --git a/src/praisonai-agents/praisonaiagents/context/__init__.py b/src/praisonai-agents/praisonaiagents/context/__init__.py index 86f0d8549..6d6767b15 100644 --- a/src/praisonai-agents/praisonaiagents/context/__init__.py +++ b/src/praisonai-agents/praisonaiagents/context/__init__.py @@ -180,6 +180,7 @@ def format_percent(value: float) -> str: "TerminalLoggerProtocol", "compute_checksum", "generate_summary", + "FileSystemArtifactStore", # Context Compaction Policy "ContextCompactionPolicy", "ContextCompactionPolicyProtocol", @@ -295,6 +296,11 @@ def __getattr__(name: str): from . import artifacts return getattr(artifacts, name) + # FileSystemArtifactStore + if name == "FileSystemArtifactStore": + from .artifact_store import FileSystemArtifactStore + return FileSystemArtifactStore + # Session Context Tracking (Agno pattern) if name in ("SessionContextTracker", "SessionState"): from . import session_tracker diff --git a/src/praisonai-agents/praisonaiagents/context/artifact_store.py b/src/praisonai-agents/praisonaiagents/context/artifact_store.py new file mode 100644 index 000000000..aa8c84d03 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/context/artifact_store.py @@ -0,0 +1,446 @@ +""" +Concrete implementation of ArtifactStoreProtocol for filesystem storage. + +This module provides a filesystem-based artifact store that persists +tool outputs to disk when they exceed configured size limits. +""" + +import os +import re +import json +import time +import uuid +import hashlib +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +from .artifacts import ( + ArtifactRef, + ArtifactMetadata, + ArtifactStoreProtocol, + GrepMatch, + compute_checksum, + generate_summary, +) + + +class FileSystemArtifactStore: + """ + Filesystem-based artifact storage implementation. + + Stores artifacts under ~/.praisonai/artifacts/{agent_id}/{run_id}/ + with content-addressed naming using SHA256 hashes. + """ + + def __init__( + self, + base_dir: Optional[str] = None, + retention_days: int = 7, + redact_secrets: bool = True, + ): + """ + Initialize the filesystem artifact store. + + Args: + base_dir: Base directory for storage (default: ~/.praisonai/artifacts) + retention_days: Days to retain artifacts before GC + redact_secrets: Whether to redact secrets from stored content + """ + if base_dir: + self.base_dir = Path(base_dir) + else: + # Use standard PraisonAI data directory + home = Path.home() + self.base_dir = home / ".praisonai" / "artifacts" + + self.retention_days = retention_days + self.redact_secrets = redact_secrets + self._secret_patterns = [ + r'(?i)(api[_-]?key|apikey)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'(?i)(secret|password|passwd|pwd)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'(?i)(token|bearer)["\']?\s*[:=]\s*["\']?[\w\-]+', + r'sk-[a-zA-Z0-9]{20,}', # OpenAI keys + r'ghp_[a-zA-Z0-9]{36}', # GitHub tokens + ] + + # Ensure base directory exists + self.base_dir.mkdir(parents=True, exist_ok=True) + + def store( + self, + content: Any, + metadata: ArtifactMetadata, + ) -> ArtifactRef: + """ + Store content as an artifact. + + Args: + content: The content to store (will be serialized) + metadata: Metadata about the artifact + + Returns: + ArtifactRef pointing to the stored artifact + """ + # Serialize content and prepare for redaction + if isinstance(content, (str, bytes)): + content_str = content if isinstance(content, str) else None + content_bytes = content.encode("utf-8") if isinstance(content, str) else content + mime_type = "text/plain" + else: + # JSON serialize structured data + content_str = json.dumps(content, indent=2, default=str) + content_bytes = content_str.encode("utf-8") + mime_type = "application/json" + + # Redact secrets if enabled (for text and JSON content) + if self.redact_secrets and content_str: + for pattern in self._secret_patterns: + content_str = re.sub(pattern, "[REDACTED]", content_str) + content_bytes = content_str.encode("utf-8") + + # Compute checksum + checksum = compute_checksum(content_bytes) + + # Generate artifact ID + artifact_id = f"{checksum[:12]}_{uuid.uuid4().hex[:8]}" + + # Create directory structure + artifact_dir = self.base_dir / metadata.agent_id / metadata.run_id + artifact_dir.mkdir(parents=True, exist_ok=True) + + # Write content to file + artifact_path = artifact_dir / f"{artifact_id}.artifact" + artifact_path.write_bytes(content_bytes) + + # Write metadata + meta_path = artifact_dir / f"{artifact_id}.meta.json" + meta_data = metadata.to_dict() + meta_data["checksum"] = checksum + meta_data["size_bytes"] = len(content_bytes) + meta_data["mime_type"] = mime_type + meta_data["created_at"] = time.time() + meta_path.write_text(json.dumps(meta_data, indent=2)) + + # Generate summary (use redacted content if available) + summary_source = content_str if content_str else str(content) + summary = generate_summary(summary_source, max_chars=200) + + # Create and return reference + ref = ArtifactRef( + path=str(artifact_path), + summary=summary, + size_bytes=len(content_bytes), + mime_type=mime_type, + checksum=checksum, + created_at=meta_data["created_at"], + artifact_id=artifact_id, + agent_id=metadata.agent_id, + run_id=metadata.run_id, + tool_name=metadata.tool_name, + turn_id=metadata.turn_id, + ) + + logging.debug(f"Stored artifact {artifact_id} ({len(content_bytes)} bytes) at {artifact_path}") + + return ref + + def _validate_artifact_path(self, path_or_ref: Union[Path, ArtifactRef]) -> Path: + """Validate that path is within the artifact store's base directory.""" + if isinstance(path_or_ref, ArtifactRef): + path = Path(path_or_ref.path) + else: + path = path_or_ref + + resolved = path.expanduser().resolve() + base_resolved = self.base_dir.expanduser().resolve() + + # Check if path is within base directory + try: + resolved.relative_to(base_resolved) + except ValueError: + raise PermissionError(f"Artifact path is outside configured store: {path}") + + # Validate extension + if resolved.suffix != ".artifact": + raise ValueError(f"Invalid artifact file extension: {path}") + + return resolved + + def load(self, ref: ArtifactRef) -> Any: + """ + Load full content from an artifact. + + Args: + ref: Reference to the artifact + + Returns: + The deserialized content + """ + path = self._validate_artifact_path(Path(ref.path)) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content_bytes = path.read_bytes() + + # Verify checksum if provided + if ref.checksum: + actual_checksum = compute_checksum(content_bytes) + if actual_checksum != ref.checksum: + raise ValueError(f"Checksum mismatch for artifact {ref.artifact_id}") + + # Deserialize based on mime type + if ref.mime_type == "application/json": + return json.loads(content_bytes.decode("utf-8")) + else: + return content_bytes.decode("utf-8") + + def tail(self, ref: ArtifactRef, lines: int = 50) -> str: + """ + Get the last N lines of an artifact. + + Args: + ref: Reference to the artifact + lines: Number of lines to return + + Returns: + String containing the last N lines + """ + path = self._validate_artifact_path(Path(ref.path)) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + content_lines = content.splitlines() + + if len(content_lines) <= lines: + return content + + return "\n".join(content_lines[-lines:]) + + def head(self, ref: ArtifactRef, lines: int = 50) -> str: + """ + Get the first N lines of an artifact. + + Args: + ref: Reference to the artifact + lines: Number of lines to return + + Returns: + String containing the first N lines + """ + path = self._validate_artifact_path(Path(ref.path)) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + content_lines = content.splitlines() + + if len(content_lines) <= lines: + return content + + return "\n".join(content_lines[:lines]) + + def grep( + self, + ref: ArtifactRef, + pattern: str, + context_lines: int = 2, + max_matches: int = 50, + ) -> List[GrepMatch]: + """ + Search for pattern in artifact content. + + Args: + ref: Reference to the artifact + pattern: Regex pattern to search for + context_lines: Number of context lines before/after match + max_matches: Maximum number of matches to return + + Returns: + List of GrepMatch objects + """ + path = self._validate_artifact_path(Path(ref.path)) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + lines = content.splitlines() + + matches = [] + pattern_re = re.compile(pattern, re.IGNORECASE) + + for i, line in enumerate(lines): + if pattern_re.search(line): + # Get context lines + start_idx = max(0, i - context_lines) + end_idx = min(len(lines), i + context_lines + 1) + + match = GrepMatch( + line_number=i + 1, # 1-indexed + line_content=line, + context_before=lines[start_idx:i], + context_after=lines[i + 1:end_idx], + ) + matches.append(match) + + if len(matches) >= max_matches: + break + + return matches + + def chunk( + self, + ref: ArtifactRef, + start_line: int = 1, + end_line: Optional[int] = None, + ) -> str: + """ + Get a chunk of lines from an artifact. + + Args: + ref: Reference to the artifact + start_line: Starting line number (1-indexed) + end_line: Ending line number (inclusive), None for end of file + + Returns: + String containing the requested lines + """ + path = self._validate_artifact_path(Path(ref.path)) + if not path.exists(): + raise FileNotFoundError(f"Artifact not found: {ref.path}") + + content = path.read_text(encoding="utf-8", errors="replace") + lines = content.splitlines() + + # Convert to 0-indexed + start_idx = max(0, start_line - 1) + end_idx = end_line if end_line is None else end_line + + return "\n".join(lines[start_idx:end_idx]) + + def delete(self, ref: ArtifactRef) -> bool: + """ + Delete an artifact. + + Args: + ref: Reference to the artifact + + Returns: + True if deleted successfully + """ + path = self._validate_artifact_path(Path(ref.path)) + meta_path = path.with_suffix(".meta.json") + + deleted = False + if path.exists(): + path.unlink() + deleted = True + + if meta_path.exists(): + meta_path.unlink() + + return deleted + + def list_artifacts( + self, + run_id: Optional[str] = None, + agent_id: Optional[str] = None, + tool_name: Optional[str] = None, + ) -> List[ArtifactRef]: + """ + List artifacts matching filters. + + Args: + run_id: Filter by run ID + agent_id: Filter by agent ID + tool_name: Filter by tool name + + Returns: + List of matching ArtifactRef objects + """ + artifacts = [] + + # Determine search path based on filters + if agent_id and run_id: + search_path = self.base_dir / agent_id / run_id + if search_path.exists(): + search_paths = [search_path] + else: + search_paths = [] + elif agent_id: + agent_path = self.base_dir / agent_id + if agent_path.exists(): + search_paths = [p for p in agent_path.iterdir() if p.is_dir()] + else: + search_paths = [] + else: + # Search all - but respect run_id filter if provided + search_paths = [] + for agent_dir in self.base_dir.iterdir(): + if agent_dir.is_dir(): + for run_dir in agent_dir.iterdir(): + if run_dir.is_dir(): + # If run_id filter provided, only include matching runs + if run_id is None or run_dir.name == run_id: + search_paths.append(run_dir) + + # Search for artifacts + for dir_path in search_paths: + for meta_file in dir_path.glob("*.meta.json"): + try: + meta_data = json.loads(meta_file.read_text()) + + # Apply tool_name filter if specified + if tool_name and meta_data.get("tool_name") != tool_name: + continue + + # Reconstruct artifact path + artifact_file = meta_file.with_suffix("").with_suffix(".artifact") + if not artifact_file.exists(): + continue + + # Create ArtifactRef + ref = ArtifactRef( + path=str(artifact_file), + summary="", # Not stored in meta, would need to regenerate + size_bytes=meta_data.get("size_bytes", 0), + mime_type=meta_data.get("mime_type", "application/octet-stream"), + checksum=meta_data.get("checksum", ""), + created_at=meta_data.get("created_at", 0), + artifact_id=artifact_file.stem, + agent_id=meta_data.get("agent_id", ""), + run_id=meta_data.get("run_id", ""), + tool_name=meta_data.get("tool_name"), + turn_id=meta_data.get("turn_id", 0), + ) + artifacts.append(ref) + except Exception as e: + logging.debug(f"Error loading artifact metadata from {meta_file}: {e}") + + # Sort by creation time (newest first) + artifacts.sort(key=lambda x: x.created_at, reverse=True) + + return artifacts + + def cleanup_old_artifacts(self, days: Optional[int] = None) -> int: + """ + Clean up artifacts older than retention period. + + Args: + days: Days to retain (overrides default retention_days) + + Returns: + Number of artifacts deleted + """ + days = self.retention_days if days is None else days + cutoff_time = time.time() - (days * 24 * 60 * 60) + deleted_count = 0 + + for artifact in self.list_artifacts(): + if artifact.created_at < cutoff_time: + if self.delete(artifact): + deleted_count += 1 + logging.debug(f"Deleted old artifact {artifact.artifact_id}") + + return deleted_count \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py new file mode 100644 index 000000000..27423a9bc --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/tools/artifact_tools.py @@ -0,0 +1,267 @@ +""" +Artifact retrieval tools for accessing spilled tool outputs. + +These tools are registered lazily when an artifact overflow occurs, +providing agents with the ability to page through preserved outputs. +""" + +import logging +import threading +from typing import Optional, List, Dict, Any + +from ..context.artifacts import ArtifactRef, GrepMatch +from .decorator import tool + + +# Thread-local storage for per-agent artifact stores +_artifact_stores = threading.local() + + +def set_artifact_store(store, agent_id: Optional[str] = None): + """Set the artifact store for retrieval tools. + + Args: + store: The artifact store instance + agent_id: Optional agent identifier for multi-agent scenarios + """ + if not hasattr(_artifact_stores, 'stores'): + _artifact_stores.stores = {} + + # Use agent_id as key, or 'default' if not provided + key = agent_id or 'default' + _artifact_stores.stores[key] = store + + +def _get_artifact_store(agent_id: Optional[str] = None): + """Get the artifact store for the current context.""" + if not hasattr(_artifact_stores, 'stores'): + return None + + # Try agent-specific store first, then fall back to default + key = agent_id or 'default' + stores = _artifact_stores.stores + return stores.get(key) or stores.get('default') + + +@tool("artifact_head") +def artifact_head( + artifact_path: str, + lines: int = 50 +) -> str: + """ + Get the first N lines of a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + lines: Number of lines to return (default: 50) + + Returns: + First N lines of the artifact content + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.head(ref, lines=lines) + except Exception as e: + logging.error(f"Failed to get artifact head: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_tail") +def artifact_tail( + artifact_path: str, + lines: int = 50 +) -> str: + """ + Get the last N lines of a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + lines: Number of lines to return (default: 50) + + Returns: + Last N lines of the artifact content + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.tail(ref, lines=lines) + except Exception as e: + logging.error(f"Failed to get artifact tail: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_grep") +def artifact_grep( + artifact_path: str, + pattern: str, + context_lines: int = 2, + max_matches: int = 50 +) -> List[Dict[str, Any]]: + """ + Search for a pattern in a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + pattern: Regex pattern to search for + context_lines: Number of context lines before/after match (default: 2) + max_matches: Maximum number of matches to return (default: 50) + + Returns: + List of matches with line numbers and context + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return [{"error": "Artifact store not available"}] + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + matches = _artifact_store.grep(ref, pattern, context_lines=context_lines, max_matches=max_matches) + + # Convert to serializable format + result = [] + for match in matches: + result.append({ + "line_number": match.line_number, + "line": match.line_content, + "context_before": match.context_before, + "context_after": match.context_after, + }) + + return result + except Exception as e: + logging.error(f"Failed to grep artifact: {e}") + return [{"error": f"Error searching artifact: {str(e)}"}] + + +@tool("artifact_chunk") +def artifact_chunk( + artifact_path: str, + start_line: int = 1, + end_line: Optional[int] = None +) -> str: + """ + Get a chunk of lines from a stored artifact. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + start_line: Starting line number (1-indexed) + end_line: Ending line number (inclusive), None for end of file + + Returns: + Lines from start_line to end_line + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + ref = ArtifactRef(path=artifact_path, summary="", size_bytes=0, mime_type="text/plain") + return _artifact_store.chunk(ref, start_line=start_line, end_line=end_line) + except Exception as e: + logging.error(f"Failed to get artifact chunk: {e}") + return f"Error reading artifact: {str(e)}" + + +@tool("artifact_load") +def artifact_load( + artifact_path: str +) -> Any: + """ + Load the full content of a stored artifact. + + WARNING: This loads the entire artifact into memory. + For large artifacts, consider using head/tail/grep/chunk instead. + + Args: + artifact_path: Path to the artifact (from ArtifactRef) + + Returns: + Full artifact content (deserialized if JSON) + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return "Error: Artifact store not available" + + try: + # Try to load metadata to get correct mime_type and checksum + import json + from pathlib import Path + + meta_path = artifact_path.replace(".artifact", ".meta.json") + meta = {} + try: + p = Path(meta_path) + if p.exists(): + meta = json.loads(p.read_text()) + except Exception: + pass + + # Create ref with actual metadata + ref = ArtifactRef( + path=artifact_path, + summary=meta.get("summary", ""), + size_bytes=meta.get("size_bytes", 0), + mime_type=meta.get("mime_type", "text/plain"), + checksum=meta.get("checksum", ""), + artifact_id=meta.get("artifact_id", ""), + agent_id=meta.get("agent_id", ""), + run_id=meta.get("run_id", ""), + tool_name=meta.get("tool_name"), + turn_id=meta.get("turn_id", 0) + ) + return _artifact_store.load(ref) + except Exception as e: + logging.error(f"Failed to load artifact: {e}") + return f"Error loading artifact: {str(e)}" + + +@tool("artifact_list") +def artifact_list( + agent_id: Optional[str] = None, + run_id: Optional[str] = None, + tool_name: Optional[str] = None +) -> List[Dict[str, Any]]: + """ + List available artifacts matching the given filters. + + Args: + agent_id: Filter by agent ID + run_id: Filter by run ID + tool_name: Filter by tool that created the artifact + + Returns: + List of artifact metadata dictionaries + """ + _artifact_store = _get_artifact_store() + if _artifact_store is None: + return [{"error": "Artifact store not available"}] + + try: + refs = _artifact_store.list_artifacts( + agent_id=agent_id, + run_id=run_id, + tool_name=tool_name + ) + + result = [] + for ref in refs: + result.append({ + "path": ref.path, + "artifact_id": ref.artifact_id, + "size_bytes": ref.size_bytes, + "created_at": ref.created_at, + "tool_name": ref.tool_name, + "summary": ref.summary, + }) + + return result + except Exception as e: + logging.error(f"Failed to list artifacts: {e}") + return [{"error": f"Error listing artifacts: {str(e)}"}] \ No newline at end of file diff --git a/test_artifact_store.py b/test_artifact_store.py new file mode 100644 index 000000000..cac629a11 --- /dev/null +++ b/test_artifact_store.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +"""Test artifact storage functionality.""" + +import sys +import os +sys.path.insert(0, 'src/praisonai-agents') + +from praisonaiagents import Agent +from praisonaiagents.config.feature_configs import ToolConfig +from praisonaiagents.context import FileSystemArtifactStore, ArtifactMetadata +from praisonaiagents.tools import tool +import tempfile + +# Create a test tool that generates large output +@tool +def generate_large_output(size: int = 20000) -> str: + """Generate a large string for testing artifact storage.""" + return "X" * size + +def test_artifact_storage(): + """Test that large tool outputs are stored as artifacts.""" + print("Testing artifact storage...") + + # Create agent with artifact storage enabled + agent = Agent( + name="TestAgent", + instructions="You are a test agent", + tools=[generate_large_output], + tool_config=ToolConfig( + output_limit=1000, # Low limit to trigger artifact storage + enable_artifacts=True, + artifact_retention_days=1 + ) + ) + + # Verify artifact store was initialized + assert agent._artifact_store is not None + print("✓ Artifact store initialized") + + # Test storing an artifact directly + store = FileSystemArtifactStore() + metadata = ArtifactMetadata( + agent_id="test_agent", + run_id="test_run", + tool_name="test_tool", + turn_id=1 + ) + + large_content = "TEST" * 5000 # 20KB content + ref = store.store(large_content, metadata) + + assert ref is not None + assert ref.size_bytes == len(large_content) + print(f"✓ Stored artifact: {ref.artifact_id}") + + # Test retrieval methods + head = store.head(ref, lines=2) + assert "TEST" in head + print("✓ Head retrieval works") + + tail = store.tail(ref, lines=2) + assert "TEST" in tail + print("✓ Tail retrieval works") + + # Test grep + matches = store.grep(ref, "TEST", context_lines=1, max_matches=5) + assert len(matches) > 0 + print(f"✓ Grep found {len(matches)} matches") + + # Test chunk + chunk = store.chunk(ref, start_line=1, end_line=3) + assert "TEST" in chunk + print("✓ Chunk retrieval works") + + # Test full load + loaded = store.load(ref) + assert loaded == large_content + print("✓ Full load works") + + # Test cleanup + deleted = store.delete(ref) + assert deleted + print("✓ Artifact deleted") + + print("\n✅ All artifact storage tests passed!") + +if __name__ == "__main__": + test_artifact_storage() \ No newline at end of file