Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions gently/app/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ async def handle_message(self, user_message: str) -> str:
user_message, cached_prompt, tools, self.mode, self._auto_save
)

async def handle_message_stream(self, user_message: str):
async def handle_message_stream(self, user_message: str, autonomous: bool = False):
"""
Handle message with streaming response.

Expand All @@ -929,6 +929,10 @@ async def handle_message_stream(self, user_message: str):
----------
user_message : str
Message from user
autonomous : bool
When True (wake turns only), sets _autonomous_active after the
turn-lock is acquired so the registry backstop never fires while a
user turn is still holding the lock.

Yields
------
Expand All @@ -949,6 +953,8 @@ async def handle_message_stream(self, user_message: str):
if lock is not None:
await lock.acquire()
acquired = True
if autonomous:
self._autonomous_active = True
try:
context_summary = await self.prompts.get_cached_context_summary(
self.experiment, self.timelapse_orchestrator, self.timeline_manager
Expand Down Expand Up @@ -980,6 +986,8 @@ async def handle_message_stream(self, user_message: str):
except StopAsyncIteration:
return
finally:
if autonomous:
self._autonomous_active = False
if acquired:
lock.release()

Expand All @@ -989,8 +997,10 @@ async def run_wake_turn(self, wake_note: str, trigger: str = None, interactive:
Runs through the normal streaming pipeline (so it acquires the turn-lock
and is recorded to conversation history / auto-saved). Brackets the turn
with an 'autonomous_start' (carrying the wake trigger) and a synthesized
'stream_end' so it streams to the web chat distinctly. Sets
_autonomous_active so the registry backstop refuses irreversible tools.
'stream_end' so it streams to the web chat distinctly. Passes
autonomous=True to handle_message_stream, which sets _autonomous_active
after acquiring the turn-lock so the registry backstop refuses
irreversible tools only while this turn holds the lock.
When interactive (ASK mode) a choice_request round-trips through the
operator; otherwise it is auto-cancelled. Run mode only.
"""
Expand All @@ -1011,8 +1021,7 @@ async def _emit(chunk):

await _emit({"type": "autonomous_start", "trigger": trigger or ""})
text_parts = []
self._autonomous_active = True
agen = self.handle_message_stream(wake_note)
agen = self.handle_message_stream(wake_note, autonomous=True)
sent_value = None
try:
while True:
Expand All @@ -1035,9 +1044,10 @@ async def _emit(chunk):
except Exception:
logger.exception("run_wake_turn error")
finally:
self._autonomous_active = False
try:
# Release the turn-lock even if a picker hung / timed out.
# If the lock was acquired, closing the generator triggers
# handle_message_stream's finally, resetting _autonomous_active
# and releasing the turn-lock.
await agen.aclose()
except Exception:
pass
Expand Down