Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions crates/aionui-ai-agent/src/agent_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,9 @@ pub trait IMockAgent: IAgentTask {
initialized: false,
})
}
async fn set_mode(&self, _mode: &str) -> Result<(), AgentError> {
Err(AgentError::bad_request("Mode switching is not supported for this mock"))
}
async fn get_model(&self) -> Result<GetModelInfoResponse, AgentError> {
Ok(GetModelInfoResponse { model_info: None })
}
async fn set_model(&self, _model_id: &str) -> Result<(), AgentError> {
Err(AgentError::bad_request(
"Model switching is not supported for this mock",
))
}
async fn set_model_confirmed(&self, model_id: &str) -> Result<GetModelInfoResponse, AgentError> {
self.set_model(model_id).await?;
self.get_model().await
}
async fn get_config_options(&self) -> Result<GetConfigOptionsResponse, AgentError> {
Ok(GetConfigOptionsResponse {
config_options: Vec::new(),
Expand Down Expand Up @@ -316,18 +304,6 @@ impl AgentInstance {
}
}

/// Set the session mode. Unsupported for variants other than ACP /
/// Aionrs — returns a `BadRequest` so the caller can surface an
/// actionable error rather than silently no-op.
pub async fn set_mode(&self, mode: &str) -> Result<(), AgentError> {
match self {
Self::Acp(m) => m.set_mode(mode).await,
Self::Aionrs(m) => m.set_mode(mode).await,
#[cfg(any(test, feature = "test-support"))]
Self::Mock(m) => m.set_mode(mode).await,
}
}

/// Get the current session model info. Only ACP exposes a model
/// catalog; other variants report `model_info = None` so the UI can
/// hide the model picker without an error.
Expand All @@ -350,42 +326,6 @@ impl AgentInstance {
}
}

/// Switch the active model. Unsupported for variants other than ACP —
/// returns a `BadRequest` so the caller can surface an actionable
/// error rather than silently no-op.
pub async fn set_model(&self, model_id: &str) -> Result<(), AgentError> {
if model_id.trim().is_empty() {
return Err(AgentError::bad_request("model_id must not be empty"));
}
match self {
Self::Acp(m) => m.set_model(model_id).await,
Self::Aionrs(_) => Err(AgentError::bad_request(
"Model switching is not supported for this agent type",
)),
#[cfg(any(test, feature = "test-support"))]
Self::Mock(m) => m.set_model(model_id).await,
}
}

/// Switch the active model and return the confirmed model payload for
/// this specific mutation, rather than re-reading potentially stale
/// cached state.
pub async fn set_model_confirmed(&self, model_id: &str) -> Result<GetModelInfoResponse, AgentError> {
if model_id.trim().is_empty() {
return Err(AgentError::bad_request("model_id must not be empty"));
}
match self {
Self::Acp(m) => Ok(GetModelInfoResponse {
model_info: Some(map_sdk_model_to_payload(m.set_model_confirmed(model_id).await?)),
}),
Self::Aionrs(_) => Err(AgentError::bad_request(
"Model switching is not supported for this agent type",
)),
#[cfg(any(test, feature = "test-support"))]
Self::Mock(m) => m.set_model_confirmed(model_id).await,
}
}

pub async fn get_config_options(&self) -> Result<GetConfigOptionsResponse, AgentError> {
match self {
Self::Acp(m) => m.config_options().await,
Expand Down
244 changes: 70 additions & 174 deletions crates/aionui-ai-agent/src/manager/acp/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,25 @@ fn matched_slash_command(raw_user_input: &str, commands: &[AvailableCommand]) ->
/// (Claude, Qwen, CodeBuddy, Codex, etc.). Communication now happens via
/// the `agent-client-protocol` SDK's JSON-RPC transport, replacing the
/// previous hand-crafted JSON-over-stdin/stdout approach.
fn mark_session_opened_after_protocol_ready(
session: &mut AcpSession,
sid: String,
protocol_connected: bool,
conversation_id: &str,
backend: Option<&str>,
) -> Result<String, AgentError> {
if !protocol_connected {
warn!(
conversation_id = %conversation_id,
backend = backend.unwrap_or("-"),
"ACP session open returned after protocol disconnected; rejecting opened transition"
);
return Err(AcpError::NotConnected.into());
}
session.mark_opened();
Ok(sid)
}

pub struct AcpAgentManager {
/// Pre-computed, immutable session parameters assembled by the factory.
pub(super) params: Arc<AcpSessionParams>,
Expand Down Expand Up @@ -465,6 +484,19 @@ impl AcpAgentManager {
runtime.bump_activity();
}

fn ensure_protocol_connected_for_operation(&self, operation: &'static str) -> Result<(), AgentError> {
if self.protocol.is_connected() {
return Ok(());
}
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
operation,
"ACP operation rejected because protocol is disconnected"
);
Err(AcpError::NotConnected.into())
}

pub(crate) async fn mode(&self) -> Result<aionui_api_types::AgentModeResponse, AgentError> {
let desired = self
.session
Expand Down Expand Up @@ -551,6 +583,8 @@ impl AcpAgentManager {
option_id: &str,
value: &str,
) -> Result<SetConfigOptionResponse, AgentError> {
self.ensure_protocol_connected_for_operation("set_config_option")?;

let (session_id, set_path, is_mode_option) = {
let session = self.session.read().await;
let snapshot = session.config_snapshot();
Expand Down Expand Up @@ -761,176 +795,6 @@ impl AcpAgentManager {
}
}

/// Set the mode for the current session.
pub(crate) async fn set_mode(&self, mode: &str) -> Result<(), AgentError> {
let normalized_mode = normalize_requested_mode(&self.params.metadata, mode);
if normalized_mode.is_empty() {
return Err(AgentError::bad_request("mode must not be empty"));
}

let session_id = {
let session = self.session.read().await;
if !session.can_select_mode(&normalized_mode) {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_mode_id = %normalized_mode,
"acp_set_mode_rejected_unavailable"
);
return Err(AgentError::bad_request(format!(
"Mode '{normalized_mode}' is not available for this ACP session"
)));
}
session.session_id().map(ToOwned::to_owned)
}
.ok_or_else(|| {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_mode_id = %normalized_mode,
"acp_set_command_missing_session"
);
AgentError::bad_request("No active session")
})?;

info!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_mode_id = %normalized_mode,
"acp_set_mode_requested"
);
codex_sandbox::sync_for_agent(&self.params.metadata, Some(&normalized_mode)).await;

if let Err(e) = self
.protocol
.set_mode(SetSessionModeRequest::new(
SessionId::new(session_id.clone()),
normalized_mode.clone(),
))
.await
{
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_mode_id = %normalized_mode,
error = %e,
"acp_set_mode_failed"
);
return Err(AgentError::from(e));
}

let mut session = self.session.write().await;
if session.session_id() != Some(session_id.as_str()) {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_mode_id = %normalized_mode,
confirmed_session_id = %session_id,
active_session_id = ?session.session_id(),
"acp_set_mode_session_changed"
);
return Err(AgentError::conflict("Active ACP session changed while applying mode"));
}
session.confirm_mode(ModeId::new(&normalized_mode));
self.commit_session_changes(&mut session).await;
info!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
confirmed_mode_id = %normalized_mode,
"acp_set_mode_confirmed"
);
Ok(())
}

async fn apply_confirmed_model_selection(&self, model_id: &str) -> Result<SessionModelState, AgentError> {
let session_id = {
let session = self.session.read().await;
if !session.can_select_model(model_id) {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_model_id = %model_id,
"acp_set_model_rejected_unavailable"
);
return Err(AgentError::bad_request(format!(
"Model '{model_id}' is not available for this ACP session"
)));
}
session.session_id().map(ToOwned::to_owned)
}
.ok_or_else(|| {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_model_id = %model_id,
"acp_set_command_missing_session"
);
AgentError::bad_request("No active session")
})?;

info!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_model_id = %model_id,
"acp_set_model_requested"
);
if let Err(e) = self
.protocol
.set_model(SetSessionModelRequest::new(
SessionId::new(session_id.clone()),
model_id.to_owned(),
))
.await
{
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_model_id = %model_id,
error = %e,
"acp_set_model_failed"
);
return Err(AgentError::from(e));
}

let mut session = self.session.write().await;
if session.session_id() != Some(session_id.as_str()) {
warn!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
requested_model_id = %model_id,
confirmed_session_id = %session_id,
active_session_id = ?session.session_id(),
"acp_set_model_session_changed"
);
return Err(AgentError::conflict("Active ACP session changed while applying model"));
}
session.confirm_model(ModelId::new(model_id));
let confirmed_model = session
.model_info()
.cloned()
.unwrap_or_else(|| SessionModelState::new(model_id.to_owned(), Vec::new()));
self.commit_session_changes(&mut session).await;
info!(
conversation_id = %self.params.conversation_id,
agent_backend = ?self.params.metadata.backend,
confirmed_model_id = %model_id,
"acp_set_model_confirmed"
);
Ok(confirmed_model)
}

/// Set the model for the current session.
pub(crate) async fn set_model(&self, model_id: &str) -> Result<(), AgentError> {
self.apply_confirmed_model_selection(model_id).await?;
Ok(())
}

/// Set the model and return the confirmed model state from this write,
/// without re-reading the asynchronously mutable session cache.
pub(crate) async fn set_model_confirmed(&self, model_id: &str) -> Result<SessionModelState, AgentError> {
self.apply_confirmed_model_selection(model_id).await
}

/// Return available slash commands from the session aggregate.
pub(crate) async fn load_slash_commands(&self) -> Result<Vec<SlashCommandItem>, AgentError> {
let session = self.session.read().await;
Expand Down Expand Up @@ -990,6 +854,7 @@ impl AcpAgentManager {
async fn ensure_session_opened(&self) -> Result<String, AgentError> {
debug!("Ensuring ACP session is opened");
let _lock = self.session_lock.lock().await;
self.ensure_protocol_connected_for_operation("ensure_session_opened")?;

let (session_id, opened) = {
let s = self.session.read().await;
Expand All @@ -1004,10 +869,16 @@ impl AcpAgentManager {

{
let mut s = self.session.write().await;
s.mark_opened();
let sid = mark_session_opened_after_protocol_ready(
&mut s,
sid,
self.protocol.is_connected(),
&self.params.conversation_id,
self.backend(),
)?;
self.commit_session_changes(&mut s).await;
Ok(sid)
}
Ok(sid)
}

/// Initialize or resume a session, then send the user message.
Expand Down Expand Up @@ -1312,8 +1183,8 @@ mod tests {
use crate::agent_runtime::AgentRuntime;
use crate::error::AgentError;
use crate::manager::acp::{AcpAgentManager, AcpSession};
use crate::protocol::error::CloseReason;
use crate::shared_kernel::{ConfigKey, ConfigValue};
use crate::protocol::error::{AcpError, CloseReason};
use crate::shared_kernel::{ConfigKey, ConfigValue, SessionId as DomainSessionId};
use agent_client_protocol::schema::{AvailableCommand, SessionConfigOptionCategory};
use serde_json::json;
use std::collections::HashMap;
Expand Down Expand Up @@ -1354,6 +1225,31 @@ mod tests {
assert_eq!(user_facing_message(&err), "Rate limited");
}

#[test]
fn warmup_does_not_mark_opened_when_protocol_disconnected_after_open() {
let mut session = AcpSession::new(None, None, Default::default());
session.set_session_id(DomainSessionId::new("sess-disconnected"));

let err = super::mark_session_opened_after_protocol_ready(
&mut session,
"sess-disconnected".to_owned(),
false,
"conv-test",
Some("codex"),
)
.expect_err("disconnected protocol must reject the opened transition");

assert!(
matches!(err, AgentError::Acp(AcpError::NotConnected)),
"expected AcpError::NotConnected, got {err:?}"
);
assert_eq!(session.session_id(), Some("sess-disconnected"));
assert!(
!session.is_opened(),
"warmup must not mark the aggregate opened when the protocol is already disconnected"
);
}

#[test]
fn nested_colons_only_strip_first() {
// "Bad gateway: Internal error: API Error: ..." → keep everything after the first ": "
Expand Down
Loading
Loading