Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agent_core/core/action/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
requirements: Optional[List[str]] = None,
timeout: Optional[int] = None,
parallelizable: bool = True,
irreversible: bool = False,
):
"""
Initialize a new Action definition.
Expand Down Expand Up @@ -101,6 +102,10 @@ def __init__(
parallelizable: Whether this action can be executed in parallel with others.
Defaults to True. Set to False for write operations, GUI actions,
state changes, send_message, etc.
irreversible: Whether the action's side effect cannot be undone once
it reaches the outside world (send email/message, post publicly).
Irreversible actions are guarded by the activity ledger so a
completed run is never silently re-executed after a crash.
"""
self.name = name
self.description = description
Expand All @@ -125,6 +130,7 @@ def __init__(
self.requirements = requirements or []
self.timeout = timeout if timeout is not None else self.DEFAULT_TIMEOUT
self.parallelizable = parallelizable
self.irreversible = irreversible

@property
def display_name(self) -> str:
Expand Down Expand Up @@ -168,6 +174,7 @@ def to_dict(self) -> Dict[str, Any]:
"requirements": self.requirements,
"timeout": self.timeout,
"parallelizable": self.parallelizable,
"irreversible": self.irreversible,
}

@classmethod
Expand Down Expand Up @@ -211,6 +218,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "Action":
requirements=data.get("requirements", []),
timeout=data.get("timeout"),
parallelizable=data.get("parallelizable", True),
irreversible=data.get("irreversible", False),
)

return data_to_return
12 changes: 12 additions & 0 deletions agent_core/core/action_framework/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class ActionMetadata:
# Whether this action can be executed in parallel with other actions.
# Set to False for: write operations, GUI actions, state changes, send_message, etc.
parallelizable: bool = True
# Whether this action's side effect cannot be undone once it reaches the
# outside world (send email/message, post publicly). Irreversible actions
# are guarded by the activity ledger: intent is recorded
# before execution and a completed run is never silently re-executed.
irreversible: bool = False

@property
def display_name(self) -> str:
Expand Down Expand Up @@ -264,6 +269,7 @@ def _get_action_as_json(self, platform_impls) -> Dict[str, Any]:
"code": main_code_str,
"platform_overrides": {},
"parallelizable": meta.parallelizable,
"irreversible": meta.irreversible,
}

# 3. Handle Platform Overrides
Expand Down Expand Up @@ -405,6 +411,7 @@ def action(
test_payload: Optional[Dict[str, Any]] = None,
action_sets: Optional[List[str]] = None,
parallelizable: bool = True,
irreversible: bool = False,
):
"""
Decorator used by developers to register functions as actions.
Expand All @@ -425,6 +432,10 @@ def action(
(e.g., ["file_operations", "core"])
parallelizable: Whether this action can run in parallel with others.
Set to False for write operations, GUI actions, state changes, etc.
irreversible: Whether the action's side effect cannot be undone once
it reaches the outside world (send email/message, post
publicly). Guarded by the activity ledger: a completed
run is never silently re-executed after a crash.
"""
# Normalize platforms input to a list of lowercase strings
if platforms is None:
Expand All @@ -449,6 +460,7 @@ def decorator_factory(func: Callable):
test_payload=test_payload,
action_sets=action_sets or [],
parallelizable=parallelizable,
irreversible=irreversible,
)

# 2. Create the full registration object
Expand Down
66 changes: 66 additions & 0 deletions agent_core/core/impl/action/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
"""
core.impl.action.idempotency

Idempotency guard protocol for irreversible actions.

Actions flagged ``irreversible=True`` (send email/message, post publicly)
have a real crash window today: the side effect happens first, the
``action_end`` record is persisted after. A crash in between leaves the
effect done with no durable record — and on resume the LLM, re-reading the
event stream, sees no completion and may re-execute it.

The guard turns "did this already run?" into a hard database check:

- ``begin()`` is called BEFORE execution. It durably records intent and
decides: proceed (fresh work), short-circuit with the stored output
(this exact run already completed), or refuse with a warning (a previous
attempt was interrupted and the side effect MAY have happened — verify
or confirm with the user instead of blindly re-sending).
- ``complete()`` is called AFTER execution with the outcome.

The implementation (the activity ledger on sessions.db) lives in the app
layer; ActionManager only knows this protocol.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, Optional, Protocol, runtime_checkable


@dataclass
class GuardDecision:
"""begin()'s verdict for one irreversible action execution."""

proceed: bool
idem_key: Optional[str] = None
# Set when this exact run already completed: returned as the action's
# output instead of re-executing the side effect.
stored_output: Optional[Dict[str, Any]] = None
# Set when a previous attempt was interrupted mid-flight: a warning the
# LLM sees as the action result instead of a blind re-execution.
note: Optional[str] = None


@runtime_checkable
class IdempotencyGuard(Protocol):
"""Pre/post execution hooks for irreversible actions."""

def begin(
self,
action_name: str,
input_data: Dict[str, Any],
session_id: Optional[str],
) -> GuardDecision:
"""Record intent durably and decide whether execution may proceed."""
...

def complete(
self,
idem_key: str,
status: str,
outputs: Optional[Dict[str, Any]],
) -> None:
"""Record the outcome of an execution begin() allowed."""
...
61 changes: 61 additions & 0 deletions agent_core/core/impl/action/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from agent_core.core.protocols.context import ContextEngineProtocol
from agent_core.core.protocols.state import StateManagerProtocol
from agent_core.core.impl.action.executor import ActionExecutor
from agent_core.core.impl.action.idempotency import IdempotencyGuard
from agent_core.utils.logger import logger

# ============================================================================
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(
on_action_start: Optional[OnActionStartHook] = None,
on_action_end: Optional[OnActionEndHook] = None,
get_parent_id: Optional[GetParentIdHook] = None,
idempotency_guard: Optional["IdempotencyGuard"] = None,
):
"""
Build an ActionManager that can execute and track actions.
Expand All @@ -134,6 +136,10 @@ def __init__(
on_action_start: Optional hook called when action starts.
on_action_end: Optional hook called when action ends.
get_parent_id: Optional hook to resolve parent_id from task context.
idempotency_guard: Optional guard consulted before/after actions
flagged ``irreversible=True``: records intent
before the side effect and prevents a completed run from
being silently re-executed after a crash.
"""
self.action_library = action_library
self.llm_interface = llm_interface
Expand All @@ -150,6 +156,7 @@ def __init__(
self._on_action_start = on_action_start
self._on_action_end = on_action_end
self._get_parent_id = get_parent_id
self._idempotency_guard = idempotency_guard

def _generate_unique_session_id(self) -> str:
"""Generate a unique 6-character session ID.
Expand Down Expand Up @@ -234,6 +241,50 @@ async def execute_action(
input_data["_session_id"] = session_id

logger.debug(f"[INPUT DATA] {input_data}")

# ── Idempotency guard for irreversible actions ──
# BEFORE the side effect: record intent durably, and refuse to
# re-execute work the ledger shows as already completed (or as
# interrupted mid-flight, where the effect may have happened).
idem_key = None
if getattr(action, "irreversible", False) and self._idempotency_guard:
try:
decision = self._idempotency_guard.begin(
action.name, input_data, session_id
)
except Exception as exc:
logger.warning(f"Idempotency guard begin() failed: {exc}")
decision = None
if decision is not None:
idem_key = decision.idem_key
if not decision.proceed:
if decision.stored_output is not None:
skip_outputs = decision.stored_output
skip_message = (
f"Action {action.name} skipped — an identical run "
f"already completed; returning its stored output."
)
else:
skip_outputs = {
"status": "error",
"error": decision.note,
"error_code": "irreversible_uncertain",
}
skip_message = (
f"Action {action.name} blocked — a previous "
f"attempt was interrupted and its side effect may "
f"already have happened. See the action output."
)
self._log_event_stream(
is_gui_task=is_gui_task,
event_type="action_end",
event=skip_message,
display_message=f"{action.display_name} → skipped (idempotent)",
action_name=action.name,
session_id=session_id,
)
return skip_outputs

run_id = str(uuid.uuid4())
started_at = datetime.utcnow().isoformat()

Expand Down Expand Up @@ -365,6 +416,16 @@ async def execute_action(

ended_at = datetime.utcnow().isoformat()

# ── Idempotency guard: record the outcome durably ──
# AFTER the side effect, BEFORE anything that could fail below — a
# crash between the side effect and this line is the §4.2 window the
# ledger exists to shrink.
if idem_key and self._idempotency_guard:
try:
self._idempotency_guard.complete(idem_key, status, outputs)
except Exception as exc:
logger.warning(f"Idempotency guard complete() failed: {exc}")

# Re-resolve parent_id after execution if hook provided
if not parent_id and self._get_parent_id:
parent_id = self._get_parent_id()
Expand Down
40 changes: 40 additions & 0 deletions agent_core/core/impl/trigger/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
core.impl.trigger.listener

Lifecycle listener protocol for TriggerQueue.

The queue is an in-memory ordering primitive; when it discards triggers
(same-session "prefer newest" replacement, session removal, full clear) a
durable store layered on top must be told so the corresponding rows are
settled instead of silently resurrecting on the next rehydration. The queue
only knows this protocol — the store implementation lives in the app layer.
"""

from __future__ import annotations

from typing import List, Optional, Protocol, runtime_checkable

from agent_core.core.trigger import Trigger


@runtime_checkable
class TriggerLifecycleListener(Protocol):
"""Receives notifications when the queue discards triggers.

Implementations must be synchronous and non-raising; the queue calls
them while holding its internal lock.
"""

def on_evicted(
self, evicted: List[Trigger], replacement: Optional[Trigger]
) -> None:
"""Triggers were removed from the queue without being consumed.

Args:
evicted: The discarded triggers.
replacement: The newer same-session trigger that superseded them,
or None when they were removed outright (session cleanup,
queue clear).
"""
...
Loading