Skip to content

Three production gaps in src/praisonai/praisonai: unsafe-by-default approvals, silent backend mis-routing, and unsafe multi-loop concurrency #2121

Description

@MervinPraison

Scope

In-depth architectural review of src/praisonai/praisonai/ only. Excludes docs, tests, coverage, file-size, and style. Every claim below was validated by reading the actual code at the cited lines.

Three systemic gaps stand out — each one violates a stated principle in the project philosophy ("safe by default", "production-ready", "multi-agent + async safe by default"), each one bites in real production scenarios, and each one is fixable with a contained patch.


Gap 1 — Approval & permission defaults are unsafe by default; LLM-driven tool calls bypass approval entirely

Stated principle violated: "Safe by default", "hard to misuse".

Four collaborating defects across _approval_spec.py, cli_backends/claude.py, and mcp_server/transports/http_stream.py mean that a brand-new user installing PraisonAI runs untrusted prompts through an autonomous tool-call loop with no approval gate in front of dangerous tools. Worse, the one CLI backend the project ships with explicitly disables every permission check.

1a. ApprovalSpec(enabled=False) is the default and there is no enforcement layer

src/praisonai/praisonai/_approval_spec.py:35-73

@dataclass(frozen=True)
class ApprovalSpec:
    enabled: bool = False                  # <-- default OFF
    backend: Backend = "console"
    approve_all_tools: bool = False        # <-- only knob is all-or-nothing
    timeout: Optional[float] = None
    approve_level: Optional[ApprovalLevel] = None
    guardrails: Optional[str] = None

    @classmethod
    def from_cli(cls, args):
        enabled = bool(
            getattr(args, 'trust', False) or
            getattr(args, 'approval', None) or
            getattr(args, 'approve_all_tools', False) or
            getattr(args, 'approve_level', None)
        )
        if getattr(args, 'trust', False):
            backend = "auto"               # <-- "auto" silently means "approve everything"

The dataclass holds config. The file contains no enforce(), no before_tool hook installer, no check_approval(). Searching the whole package confirms nothing in the tool-execution path (praisonaiagents.hooks.add_hook("before_tool", ...)) ever consults ApprovalSpec for autonomous LLM-driven tool calls. That means:

  • A user invoking praisonai run config.yaml with no approval flag gets enabled=False → zero gating.
  • A user passing --trust gets backend="auto" (silent auto-approve) with no warning logged.
  • There is no per-tool granularity — approve_all_tools: bool is binary; the user cannot say "approve shell, auto-approve calculator".
  • The Python API and any HTTP entry point construct agents without ever going through from_cli, so ApprovalSpec simply doesn't exist on those paths.

Production scenario: A web app routes user-controlled prompts to agent.start(). The agent has a shell tool wired in. There is no path by which ApprovalSpec can gate that call, because the spec was never constructed — and even if it had been, no module registers a before_tool hook that consults it.

1b. ClaudeCodeBackend ships --permission-mode bypassPermissions

src/praisonai/praisonai/cli_backends/claude.py:24-33

DEFAULT_CONFIG = CliBackendConfig(
    command="claude",
    args=[
        "-p",
        "--output-format", "stream-json",
        "--include-partial-messages",
        "--verbose",
        "--setting-sources", "user",
        "--permission-mode", "bypassPermissions"   # <-- shell/edit/write ALL bypassed
    ],
    ...
)

Any YAML or Python user picking cli_backend: claude-code silently launches the Claude CLI with all permission checks disabled. There is no env-var guard, no warning at startup, no audit-log entry. The flag is buried in a 30-line DEFAULT_CONFIG.

1c. MCP HTTP transport authenticates POST but not GET (SSE) or DELETE

src/praisonai/praisonai/mcp_server/transports/http_stream.py — only mcp_post (~line 199-205) does the bearer-token check; mcp_get (~line 284-320, the SSE stream) and mcp_delete (~line 322-337, session termination) never call self.api_key. So an attacker who learns any MCP-Session-Id (leaked via logs, sniffed, predicted) can subscribe to the event stream or terminate sessions with no token — two of three HTTP verbs on a nominally "authenticated" server require no auth.

Proposed fix (combined)

  1. Flip the default in ApprovalSpec to enabled=True, backend="console" and add a default_policy: Literal["deny","prompt","allow"] = "prompt". Add approve_tools: Dict[str, ApprovalLevel] | None for per-tool granularity.
  2. Add ApprovalSpec.install_hook() that registers a before_tool hook returning a block decision until approval is granted. Expose praisonai.security.enable_approval(spec) as a single global entry point used by CLI, YAML, and Python — restoring the 3-way feature surface.
  3. Change ClaudeCodeBackend default --permission-mode to "default". Require an explicit unsafe: true field on CliBackendConfig (or env PRAISONAI_CLAUDE_BYPASS_PERMISSIONS=1) before injecting bypassPermissions. Log a loud warning when bypass is active.
  4. In http_stream.py, extract _check_auth(request) -> Optional[Response] and call it at the top of mcp_get, mcp_post, and mcp_delete. Use hmac.compare_digest for constant-time compare.
  5. Persist an ApprovalDecision record ({timestamp, tool, args_sha256, decision, user}) to ~/.praisonai/approvals.jsonl. At tool execution time, recompute sha256(args) and refuse on mismatch to close the TOCTOU window between approval and call.

Gap 2 — Multiple subsystems silently fall back to the wrong backend instead of failing loudly

Stated principle violated: "Production-ready", "hard to misuse".

Across four independent subsystems — DB routing, LLM provider routing, sandbox selection, and lazy import caching — unknown inputs silently fall through to a default that is almost certainly not what the user intended. Users get "success" responses while the request was routed to the wrong place, leaking data, billing the wrong vendor, or returning canned output that masquerades as real.

2a. PraisonAIDB._detect_backend collapses any unknown HTTPS URL to SQLite

src/praisonai/praisonai/db/adapter.py:106-143

def _detect_backend(self, url: str) -> str:
    ...
    elif url_lower.startswith("http://") or url_lower.startswith("https://"):
        if ".supabase.co" in url_lower or ".supabase.com" in url_lower:
            return "supabase"
        if "qdrant" in url_lower or ":6333" in url_lower:
            return "qdrant"
        elif "weaviate" in url_lower:
            return "weaviate"
    # Default fallback
    return "sqlite"

database_url="https://my-pg-proxy.example.com", a typo'd .supbase.co, Pinecone, Chroma cloud, or any unrecognised remote endpoint silently routes to local SQLite. The user believes they configured a remote DB; conversation history lives in a per-pod file and is lost on every redeploy.

2b. parse_model_string routes every unknown model to OpenAI

src/praisonai/praisonai/llm/registry.py:311-339

if model_lower.startswith("gpt-") or model_lower.startswith("o1") or model_lower.startswith("o3"):
    return {"provider_id": "openai", "model_id": model}
if model_lower.startswith("claude-"):
    return {"provider_id": "anthropic", "model_id": model}
if model_lower.startswith("gemini-"):
    return {"provider_id": "google", "model_id": model}
# Default to openai
return {"provider_id": "openai", "model_id": model}

A user passing "llama3", "mistral-large", "command-r", "deepseek-chat", or even "o4-mini" (which does not match the o1/o3 prefix) is silently dispatched to OpenAI with a bogus model id. The error returned references OpenAI; the user's OPENAI_API_KEY is billed for a model they never picked.

2c. DaytonaSandbox is a hardcoded simulation that always returns success

src/praisonai/praisonai/sandbox/daytona.py:319-356, 414-464

async def _execute_in_workspace(self, code, language, ...):
    if language.lower() == "python":
        if "import" in code and "numpy" in code:
            return {"exit_code": 0, "stdout": "1.24.3", "stderr": ""}
        elif "print" in code:
            return {"exit_code": 0, "stdout": "Hello from Daytona!", "stderr": ""}
    return {"exit_code": 0, "stdout": f"Executed {language} code in Daytona workspace", "stderr": ""}

async def write_file(self, path, content):
    logger.info(f"Writing file to Daytona workspace: {path}")
    return True   # <-- returns True without writing anything

This backend is registered in _BUILTIN_SANDBOXES and exported from __init__.py. A user picks "daytona" expecting cloud isolation and gets canned stdout, fake write_file success, and a fixed list_files payload. Workflows that branch on result.exit_code == 0 route privileged downstream actions through this no-op believing they succeeded.

2d. LazyCache caches ImportError as None permanently and tuple-unpacks blow up downstream

src/praisonai/praisonai/_lazy_cache.py:15-26

def get(self, key: str, loader: Callable[[], T]) -> Optional[T]:
    if key in self._cache:
        return self._cache[key]
    with self._lock:
        if key in self._cache:
            return self._cache[key]
        try:
            value: object = loader()
        except ImportError:
            value = None              # <-- ALL ImportErrors → cached None forever
        self._cache[key] = value
        return value

Three compounding issues:

  1. Permanent negative cache. If a transient ImportError happens once (partial venv, missing LD_LIBRARY_PATH, sibling re-import during test bootstrap), the None is cached for the entire process lifetime. There is no reset() call from any consumer.
  2. Cascading ImportError masking. Loader is lambda: __import__("crewai", fromlist=["Agent", "Task", "Crew"]). If crewai is installed but its own transitive import langchain_core is broken, the user sees None returned, identical to "crewai not installed".
  3. Wrong return-type contract. _get_crewai() callers in auto.py tuple-unpack the return value (Agent, Task, Crew = _get_crewai()) without checking None, so users get TypeError: cannot unpack non-iterable NoneType object instead of ImportError: pip install crewai.

Proposed fix (combined)

In each fallback path, raise instead of fall through. The fix shape is the same across all four:

# db/adapter.py
elif url_lower.startswith(("http://", "https://")):
    raise ValueError(
        f"Unable to infer DB backend from URL {url!r}; "
        "pass backend=... explicitly or use a recognised scheme (sqlite://, postgres://, ...)"
    )

# llm/registry.py
raise ValueError(
    f"Cannot infer provider from model {model!r}. "
    "Use the 'provider/model' form, e.g. 'ollama/llama3', 'bedrock/anthropic.claude-3-sonnet'."
)

# sandbox/daytona.py — until a real Daytona SDK integration exists
class DaytonaSandbox:
    is_available = False
    async def start(self):
        raise NotImplementedError("Daytona backend not yet implemented")

# _lazy_cache.py — cache the exception, re-raise on next access
try:
    value = loader()
except ImportError as e:
    self._cache[key] = e
    raise

For SandlockSandbox falling back to SubprocessSandbox (which has no kernel isolation) in sandbox/sandlock.py:310-322, invert the default: require allow_unsafe_fallback=True rather than require_landlock=True.


Gap 3 — Concurrency primitives are unsafe for multi-tenant / multi-loop / multi-agent use, and one workflow path is dead on arrival

Stated principle violated: "Multi-agent + async safe by default".

The wrapper ships three primitives (AsyncBridge, LockMap, ToolRegistry) plus an incomplete migration in auto.py that together make multi-loop or multi-tenant workloads brittle or outright broken. A FastAPI app reusing this module across requests, or a pytest-asyncio test suite running multiple loops, hits all of these.

3a. AsyncBridge.shutdown() cancels its own driver coroutine and holds the bridge lock across the wait

src/praisonai/praisonai/_async_bridge.py:87-107

def shutdown(self, timeout: float = 5.0) -> None:
    with self._lock:                           # <-- lock held for up to `timeout` s
        loop, thread = self._loop, self._thread
        if loop is None:
            return
        async def _cancel_all() -> None:
            tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
            for t in tasks:                    # <-- includes _cancel_all itself
                t.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)
        try:
            asyncio.run_coroutine_threadsafe(_cancel_all(), loop).result(timeout)
        finally:
            loop.call_soon_threadsafe(loop.stop)
            ...

asyncio.run_coroutine_threadsafe wraps _cancel_all in a Task on the loop, so the task appears in asyncio.all_tasks(loop). The body cancels every task including itself, then hits await asyncio.gather(...) on a cancelled task — _cancel_all raises CancelledError immediately, the gather never runs, and outstanding cancellations are never awaited. The concurrent.futures.Future resolves as cancelled, so .result(timeout) raises CancelledError. In-flight DB cursors / HTTP connections in cancelled tasks may leak because they never got a chance to finalize.

Additionally, the entire body holds self._lock. During shutdown, any concurrent submit() / run_sync() blocks for up to timeout seconds.

Fix:

async def _cancel_all() -> None:
    self_task = asyncio.current_task()
    tasks = [t for t in asyncio.all_tasks(loop)
             if not t.done() and t is not self_task]
    for t in tasks:
        t.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

And release self._lock before the .result(timeout) call by snapshotting loop/thread and exiting the with block first.

3b. LockMap.get() returns asyncio.Lock instances bound to whichever loop happened to be running on first await

src/praisonai/praisonai/_lockmap.py:35-57

def get(self, key: Hashable) -> asyncio.Lock:
    now = time.monotonic()
    entry = self._locks.get(key)
    if entry is not None:
        lock, _ = entry
        self._locks.move_to_end(key)
        self._locks[key] = (lock, now)
        return lock
    lock = asyncio.Lock()           # <-- binds lazily to current running loop
    self._locks[key] = (lock, now)
    self._evict_stale(now)
    return lock

The class docstring actively encourages module-scope use ("use this anywhere you need 'a lock per (user_id|chat_id|key)'"). But asyncio.Lock in modern Python binds to the running event loop on first __aenter__. If a LockMap is cached at module scope and used from two different loops — asyncio.run() called twice, pytest-asyncio creating a fresh loop per test, or any pattern that spins a new loop after the first — the second async with lockmap.get(k): raises RuntimeError: ... attached to a different loop. Recovery requires restarting the process.

Fix: Bucket locks by id(asyncio.get_running_loop()) and prune buckets whose loop is closed:

def get(self, key):
    loop = asyncio.get_running_loop()
    bucket = self._buckets.setdefault(id(loop), OrderedDict())
    ...  # rest unchanged but scoped to bucket

3c. ToolRegistry._resolver keeps only the LAST attached resolver — sibling resolvers get permanent stale negative-cache entries

src/praisonai/praisonai/tool_registry.py:23, 32-35, 108-118

def __init__(self):
    ...
    self._resolver = None  # Will be set by AgentsGenerator to enable cache invalidation

def register_function(self, name: str, func: Callable) -> None:
    ...
    if self._resolver is not None:
        self._resolver.invalidate(name)

def set_resolver(self, resolver) -> None:
    self._resolver = resolver        # <-- last-writer-wins

ToolResolver.resolve() caches negative lookups (self._resolve_cache[name] = None at tool_resolver.py:371). Combined with the single-reference invalidation above, two ToolResolver instances sharing the same ToolRegistry (a normal pattern for multi-tenant services that create per-request AgentsGenerator instances against a process-wide registry) produce this bug:

  1. Resolver A is constructed, calls set_resolver(A). Resolver A misses on "my_tool" → cache {"my_tool": None}.
  2. Resolver B is constructed, calls set_resolver(B) → registry now only knows about B.
  3. User calls registry.register_function("my_tool", ...). Registry invalidates B; A's negative cache is never touched.
  4. Resolver A forever returns None for my_tool, despite the tool being registered.

Fix: Use a list of weakrefs:

import weakref
self._resolvers: list[weakref.ref] = []

def set_resolver(self, resolver):
    self._resolvers.append(weakref.ref(resolver))

def _notify_invalidate(self, name=None):
    alive = []
    for ref in self._resolvers:
        r = ref()
        if r is not None:
            r.invalidate(name)
            alive.append(ref)
    self._resolvers = alive

Then call _notify_invalidate(name) from register_function and _notify_invalidate() from clear.

3d. WorkflowAutoGenerator references an undefined _models_cache — the workflow auto-generation feature crashes on first call

src/praisonai/praisonai/auto.py:1053-1103, 1511-1536

def _get_workflow_models():
    """Get workflow structure models, creating them on first use."""
    if 'workflow_models' in _models_cache:                    # NameError
        return _models_cache['workflow_models']
    with _models_lock:                                        # NameError
        if 'workflow_models' in _models_cache:
            return _models_cache['workflow_models']
        ...
        _models_cache['workflow_models'] = { ... }
        return _models_cache['workflow_models']

Neither _models_cache nor _models_lock is defined in auto.py (verified by grep '^_models_cache\|_models_cache\s*=' returning zero matches). It looks like _get_team_models() was migrated to use lazy_get(...) from _lazy_cache, but _get_workflow_models() and _get_job_workflow_models() were left referencing the old module-level cache that no longer exists. Any caller invoking WorkflowAutoGenerator.generate() or JobWorkflowAutoGenerator.generate() raises NameError: name '_models_cache' is not defined on the very first call — both publicly documented features are dead code.

Fix: Migrate to the same lazy_get pattern used by _get_team_models:

def _get_workflow_models():
    return lazy_get('workflow_models', _create_workflow_models)

def _create_workflow_models():
    from pydantic import BaseModel
    class WorkflowStepDetails(BaseModel): ...
    ...
    return {
        'WorkflowStepDetails': WorkflowStepDetails,
        ...
    }

Or, minimally, restore the module-level state at the top of auto.py:

import threading
_models_cache: dict = {}
_models_lock = threading.Lock()

Summary

# Gap Where it bites Effort
1 Approval/permission unsafe defaults; LLM tool-call path has no enforcement hook Any user running an agent with a shell-style tool — --trust, cli_backend: claude-code, and bare Python API all skip approval Medium (one new install_hook + flip 4 defaults + fix MCP auth + add --unsafe flag for bypassPermissions)
2 Silent fallback routing across DB, LLM, sandbox, lazy-cache Users believe they configured Postgres/Ollama/Daytona; actually get SQLite/OpenAI/canned output Small (raise instead of fall through in 4 spots; invert sandlock fallback policy)
3 AsyncBridge self-cancellation; LockMap cross-loop; ToolRegistry last-resolver-wins; _models_cache undefined Web servers / pytest-asyncio / multi-tenant deployments; WorkflowAutoGenerator 100% broken Small per item; all four are surgical patches

All findings validated against the current main-side code on claude/bold-bohr-qrzjpq. Happy to split this into three PRs (or one) and start with whichever the maintainers prefer.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisdocumentationImprovements or additions to documentationsecurity

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions