diff --git a/Cargo.toml b/Cargo.toml index 69b6f01ae..2ecee6a20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,11 @@ async-trait = "0.1" serenity = { version = "0.12", default-features = false, features = ["client", "gateway", "model", "rustls_backend", "cache"], optional = true } [features] -# Default: core only (Discord + Slack). Gateway ships as separate binary. +# Default: core only (Discord + Slack). Platform adapters are opt-in. default = ["discord", "slack", "secrets-aws", "agentcore", "config-s3", "pre-seed"] -# Opt-in: compile all gateway adapters into a single unified binary -unified = ["telegram", "line", "feishu", "googlechat", "wecom", "teams"] +# Opt-in: compile all platform adapters into a single unified binary +unified = ["telegram", "line", "feishu", "googlechat", "wecom", "teams", "vtuber"] # Core adapters discord = ["dep:serenity", "openab-core/discord"] @@ -40,13 +40,14 @@ agentcore = ["openab-core/agentcore"] config-s3 = ["openab-core/config-s3"] pre-seed = ["openab-core/pre-seed"] -# Gateway adapters (each pulls in the gateway crate + axum for embedded server) +# Platform adapters (each pulls in the gateway crate + axum for embedded server) telegram = ["dep:openab-gateway", "dep:axum", "openab-gateway/telegram"] line = ["dep:openab-gateway", "dep:axum", "openab-gateway/line"] feishu = ["dep:openab-gateway", "dep:axum", "openab-gateway/feishu"] googlechat = ["dep:openab-gateway", "dep:axum", "openab-gateway/googlechat"] wecom = ["dep:openab-gateway", "dep:axum", "openab-gateway/wecom"] teams = ["dep:openab-gateway", "dep:axum", "openab-gateway/teams"] +vtuber = ["dep:openab-gateway", "dep:axum", "openab-gateway/vtuber"] [dev-dependencies] tempfile = "3.27.0" diff --git a/crates/openab-gateway/Cargo.toml b/crates/openab-gateway/Cargo.toml index 26ee00db9..c9f499f9c 100644 --- a/crates/openab-gateway/Cargo.toml +++ b/crates/openab-gateway/Cargo.toml @@ -35,10 +35,11 @@ urlencoding = "2" wiremock = "0.6" [features] -default = ["telegram", "line", "feishu", "googlechat", "wecom", "teams"] +default = ["telegram", "line", "feishu", "googlechat", "wecom", "teams", "vtuber"] telegram = [] line = [] feishu = [] googlechat = [] wecom = [] teams = [] +vtuber = [] diff --git a/crates/openab-gateway/src/adapters/mod.rs b/crates/openab-gateway/src/adapters/mod.rs index f58f870a2..7c58c2135 100644 --- a/crates/openab-gateway/src/adapters/mod.rs +++ b/crates/openab-gateway/src/adapters/mod.rs @@ -12,3 +12,5 @@ pub mod googlechat; pub mod wecom; #[cfg(feature = "teams")] pub mod teams; +#[cfg(feature = "vtuber")] +pub mod vtuber; diff --git a/crates/openab-gateway/src/adapters/teams.rs b/crates/openab-gateway/src/adapters/teams.rs index c8dec8509..3eb9fd71b 100644 --- a/crates/openab-gateway/src/adapters/teams.rs +++ b/crates/openab-gateway/src/adapters/teams.rs @@ -275,7 +275,9 @@ impl TeamsAdapter { } // B2: Validate channel endorsements — key must endorse the activity's channelId - let channel_id = activity.channel_id.as_deref() + let channel_id = activity + .channel_id + .as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing channelId"))?; if key.endorsements.is_empty() { anyhow::bail!("JWK has no endorsements — cannot verify channelId={channel_id}"); @@ -301,9 +303,13 @@ impl TeamsAdapter { let token_data = decode::(token, &decoding_key, &validation)?; // B1: Validate serviceUrl claim matches activity's serviceUrl - let activity_service_url = activity.service_url.as_deref() + let activity_service_url = activity + .service_url + .as_deref() .ok_or_else(|| anyhow::anyhow!("activity missing serviceUrl"))?; - let token_service_url = token_data.claims.get("serviceurl") + let token_service_url = token_data + .claims + .get("serviceurl") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("JWT missing serviceurl claim"))?; if token_service_url != activity_service_url { @@ -799,7 +805,9 @@ mod tests { async fn jwt_rejects_garbage_token() { let adapter = TeamsAdapter::new(make_config(vec![])); let activity = make_activity_with_tenant(Some("t1")); - let result = adapter.validate_jwt("Bearer not.a.valid.jwt", &activity).await; + let result = adapter + .validate_jwt("Bearer not.a.valid.jwt", &activity) + .await; assert!(result.is_err()); } diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs new file mode 100644 index 000000000..82c1a4fcf --- /dev/null +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -0,0 +1,1408 @@ +//! VTuber platform adapter. +//! +//! Tier-1: OpenAI-compatible POST /v1/chat/completions (SSE) — streams agent +//! replies as `chat.completion.chunk` deltas. +//! Tier-2: WebSocket /v1/vtuber/ws — pushes agent_state, tool_status, emotion, +//! and notification events derived from GatewayReply commands. + +use crate::schema::*; +use axum::extract::ws::{Message, WebSocket}; +use axum::extract::{Query, State, WebSocketUpgrade}; +use axum::http::{header::AUTHORIZATION, HeaderMap, StatusCode}; +use axum::response::sse::{Event as SseEvent, KeepAlive, Sse}; +use axum::response::IntoResponse; +use axum::Json; +use futures_util::stream::Stream; +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::collections::{HashMap, HashSet}; +use std::convert::Infallible; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{mpsc, Mutex}; +use tracing::{info, warn}; +use uuid::Uuid; + +// --------------------------------------------------------------------------- +// Tier-2 WS event types +// --------------------------------------------------------------------------- + +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type")] +pub enum WsEvent { + #[serde(rename = "agent_state")] + AgentState { + ts: i64, + state: &'static str, + session_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + detail: Option, + }, + #[serde(rename = "emotion")] + Emotion { + ts: i64, + session_id: String, + tag: String, + intensity: f32, + }, + #[serde(rename = "notification")] + Notification { + ts: i64, + text: String, + urgency: &'static str, + }, + #[serde(rename = "tool_status")] + ToolStatus { + ts: i64, + session_id: String, + tool_name: String, + status: &'static str, + }, + #[serde(rename = "pong")] + Pong { ts: i64 }, +} + +#[derive(Clone, Debug, Serialize)] +pub struct AgentStateDetail { + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_class: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + pub subagent_count: Option, +} + +// --------------------------------------------------------------------------- +// Client → Server commands +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +#[serde(tag = "type")] +enum WsCommand { + #[serde(rename = "subscribe")] + Subscribe { events: Vec }, + #[serde(rename = "ping")] + Ping, +} + +// --------------------------------------------------------------------------- +// Connected WS clients registry +// --------------------------------------------------------------------------- + +pub struct WsClient { + pub tx: mpsc::UnboundedSender, + pub subscribed: Option>, +} + +static NEXT_CLIENT_ID: AtomicU64 = AtomicU64::new(0); + +pub type WsClients = Arc>>; + +pub fn new_ws_clients() -> WsClients { + Arc::new(Mutex::new(HashMap::new())) +} + +// --------------------------------------------------------------------------- +// Ambient notification loop +// --------------------------------------------------------------------------- + +pub struct AmbientConfig { + interval: Duration, + prompt: String, + urgency: &'static str, +} + +impl AmbientConfig { + pub fn from_env() -> Option { + let enabled = std::env::var("VTUBER_AMBIENT_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + if !enabled { + return None; + } + + let interval_secs = std::env::var("VTUBER_AMBIENT_INTERVAL_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v >= 60) + .unwrap_or(1800); + let prompt = std::env::var("VTUBER_AMBIENT_PROMPT").unwrap_or_else(|_| { + "Ambient check-in: say one brief, natural message as 小光. If there is no urgent context, keep it warm and non-intrusive.".into() + }); + let urgency = match std::env::var("VTUBER_AMBIENT_URGENCY") + .unwrap_or_else(|_| "normal".into()) + .to_lowercase() + .as_str() + { + "low" => "low", + "high" => "high", + _ => "normal", + }; + + Some(Self { + interval: Duration::from_secs(interval_secs), + prompt, + urgency, + }) + } +} + +pub fn spawn_ambient_task(clients: WsClients) { + let Some(config) = AmbientConfig::from_env() else { + return; + }; + + info!( + interval_secs = config.interval.as_secs(), + urgency = config.urgency, + "vtuber ambient mode enabled" + ); + + tokio::spawn(async move { + loop { + tokio::time::sleep(config.interval).await; + + let client_count = clients.lock().await.len(); + if client_count == 0 { + continue; + } + + let event = WsEvent::Notification { + ts: chrono::Utc::now().timestamp(), + text: config.prompt.clone(), + urgency: config.urgency, + }; + broadcast(&clients, &[event]).await; + info!(client_count, "vtuber ambient notification broadcast"); + } + }); +} + +// --------------------------------------------------------------------------- +// Reaction emoji → agent_state mapping (OAB core reactions.rs) +// --------------------------------------------------------------------------- + +struct ReactionMapping { + state: &'static str, + tool_class: Option<&'static str>, +} + +fn reaction_to_state(emoji: &str) -> Option { + // ponytail: match on first char for multi-codepoint emojis (👨‍💻 = U+1F468 ZWJ U+1F4BB) + let first = emoji.chars().next()?; + Some(match first { + '👀' => ReactionMapping { + state: "thinking", + tool_class: None, + }, + '🤔' => ReactionMapping { + state: "thinking", + tool_class: None, + }, + '🔥' => ReactionMapping { + state: "working", + tool_class: Some("tool"), + }, + '👨' => ReactionMapping { + state: "working", + tool_class: Some("coding"), + }, // 👨‍💻 + '⚡' => ReactionMapping { + state: "working", + tool_class: Some("web"), + }, + '🆗' => ReactionMapping { + state: "attention", + tool_class: None, + }, + '😱' => ReactionMapping { + state: "error", + tool_class: None, + }, + '🥱' => ReactionMapping { + state: "error", + tool_class: None, + }, + '😨' => ReactionMapping { + state: "error", + tool_class: None, + }, + _ => return None, + }) +} + +// --------------------------------------------------------------------------- +// Extract [emotion] tags from streamed text +// --------------------------------------------------------------------------- + +fn extract_emotion_tags(text: &str) -> Vec { + let mut tags = Vec::new(); + let mut rest = text; + while let Some(start) = rest.find('[') { + if let Some(end) = rest[start..].find(']') { + let tag = &rest[start + 1..start + end]; + if !tag.is_empty() + && tag.len() < 30 + && tag.chars().all(|c| c.is_alphanumeric() || c == '_') + { + tags.push(tag.to_string()); + } + rest = &rest[start + end + 1..]; + } else { + break; + } + } + tags +} + +fn parse_pure_tool_status(text: &str) -> Option<(String, &'static str)> { + let normalized = text + .trim() + .trim_matches('`') + .trim() + .trim_start_matches('✅') + .trim_start_matches('✔') + .trim_start_matches('✓') + .trim() + .trim_matches('`') + .trim(); + + if normalized.is_empty() || normalized.len() > 64 { + return None; + } + + let mut chars = normalized.chars(); + let first = chars.next()?; + if !first.is_ascii_alphabetic() { + return None; + } + if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == ' ') { + return None; + } + + let lower = normalized.to_ascii_lowercase(); + if lower == "toolsearch" + || lower == "tool search" + || lower.ends_with("search") + || lower.ends_with("tool") + { + return Some((normalized.to_string(), "done")); + } + + None +} + +// --------------------------------------------------------------------------- +// Derive Tier-2 WS events from a GatewayReply +// --------------------------------------------------------------------------- + +pub fn derive_events(reply: &GatewayReply) -> Vec { + let ts = chrono::Utc::now().timestamp(); + let session_id = reply.channel.id.clone(); + let mut events = Vec::new(); + + match reply.command.as_deref() { + Some("add_reaction") => { + if let Some(mapping) = reaction_to_state(&reply.content.text) { + events.push(WsEvent::AgentState { + ts, + state: mapping.state, + session_id, + detail: mapping.tool_class.map(|tc| AgentStateDetail { + tool_class: Some(tc), + subagent_count: None, + }), + }); + } + } + Some("remove_reaction") => { + // Reaction cleared — no state change pushed; next add_reaction or + // send_message will update the state. + } + Some("edit_message") => { + if let Some((tool_name, status)) = parse_pure_tool_status(&reply.content.text) { + events.push(WsEvent::ToolStatus { + ts, + session_id, + tool_name, + status, + }); + return events; + } + + for tag in extract_emotion_tags(&reply.content.text) { + events.push(WsEvent::Emotion { + ts, + session_id: session_id.clone(), + tag, + intensity: 1.0, + }); + } + } + None | Some("send_message") => { + // Final message — extract any trailing emotions, then go idle. + for tag in extract_emotion_tags(&reply.content.text) { + events.push(WsEvent::Emotion { + ts, + session_id: session_id.clone(), + tag, + intensity: 1.0, + }); + } + events.push(WsEvent::AgentState { + ts, + state: "idle", + session_id, + detail: None, + }); + } + _ => {} + } + + events +} + +// --------------------------------------------------------------------------- +// Broadcast events to connected WS clients +// --------------------------------------------------------------------------- + +pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { + if events.is_empty() { + return; + } + let mut dead = Vec::new(); + let mut guard = clients.lock().await; + for (&client_id, client) in guard.iter() { + for event in events { + let event_type = match event { + WsEvent::AgentState { .. } => "agent_state", + WsEvent::Emotion { .. } => "emotion", + WsEvent::Notification { .. } => "notification", + WsEvent::ToolStatus { .. } => "tool_status", + WsEvent::Pong { .. } => continue, + }; + if let Some(ref subs) = client.subscribed { + if !subs.contains(event_type) { + continue; + } + } + if let Ok(json) = serde_json::to_string(event) { + if client.tx.send(json).is_err() { + dead.push(client_id); + break; + } + } + } + } + for client_id in dead { + guard.remove(&client_id); + } +} + +// --------------------------------------------------------------------------- +// WS upgrade handler: GET /v1/vtuber/ws +// --------------------------------------------------------------------------- + +pub async fn ws_upgrade( + State(state): State>, + query: Query>, + headers: axum::http::HeaderMap, + ws: WebSocketUpgrade, +) -> axum::response::Response { + // Auth: Bearer token from Authorization header or ?token= query param + let token = headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .or_else(|| query.get("token").map(|s| s.as_str())); + + let expected = state.vtuber.as_ref().and_then(|c| c.auth_key.as_ref()); + if let Some(expected) = expected { + if token != Some(expected.as_str()) { + warn!("vtuber WS rejected: invalid or missing token"); + return axum::http::StatusCode::UNAUTHORIZED.into_response(); + } + } + + ws.on_upgrade(move |socket| handle_ws(state, socket)) +} + +async fn handle_ws(state: Arc, socket: WebSocket) { + let (mut ws_tx, mut ws_rx) = socket.split(); + let (tx, mut rx) = mpsc::unbounded_channel::(); + let client_id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed); + + info!(client_id, "vtuber WS client connected"); + + // Register client + let clients = match &state.vtuber_ws_clients { + Some(c) => c.clone(), + None => return, + }; + clients.lock().await.insert( + client_id, + WsClient { + tx, + subscribed: None, + }, + ); + + // Forward events → client + let send_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if ws_tx.send(Message::Text(msg.into())).await.is_err() { + break; + } + } + }); + + // Receive commands from client + let clients_for_recv = clients.clone(); + let recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = ws_rx.next().await { + if let Message::Text(text) = msg { + match serde_json::from_str::(&text) { + Ok(WsCommand::Subscribe { events }) => { + let mut guard = clients_for_recv.lock().await; + if let Some(client) = guard.get_mut(&client_id) { + client.subscribed = Some(events.into_iter().collect()); + info!(events = ?client.subscribed, "vtuber WS subscribe updated"); + } + } + Ok(WsCommand::Ping) => { + let pong = WsEvent::Pong { + ts: chrono::Utc::now().timestamp(), + }; + if let Ok(json) = serde_json::to_string(&pong) { + let guard = clients_for_recv.lock().await; + if let Some(client) = guard.get(&client_id) { + let _ = client.tx.send(json); + } + } + } + Err(_) => { + warn!(raw = %text, "vtuber WS unknown command"); + } + } + } + } + }); + + tokio::select! { + _ = send_task => {}, + _ = recv_task => {}, + } + + // Cleanup + clients.lock().await.remove(&client_id); + info!(client_id, "vtuber WS client disconnected"); +} + +// --------------------------------------------------------------------------- +// Tier-1: OpenAI-compatible /v1/chat/completions (SSE) +// --------------------------------------------------------------------------- + +pub enum ReplyChunk { + Snapshot(String), + Done, +} + +pub type ReplyRegistry = Arc>>>; + +const REPLY_FIRST_TIMEOUT: Duration = Duration::from_secs(180); +const DEFAULT_REPLY_TAIL_IDLE: Duration = Duration::from_millis(1500); + +fn reply_tail_idle_timeout() -> Duration { + std::env::var("VTUBER_REPLY_TAIL_IDLE_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v > 0) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_REPLY_TAIL_IDLE) +} + +pub struct VtuberConfig { + pub auth_key: Option, + pub default_model: String, +} + +impl VtuberConfig { + pub fn from_env() -> Option { + let enabled = std::env::var("VTUBER_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + if !enabled { + return None; + } + let auth_key = std::env::var("VTUBER_AUTH_KEY").ok(); + if auth_key.is_none() { + warn!("VTUBER_AUTH_KEY not set — /v1/chat/completions is UNAUTHENTICATED"); + } + let default_model = + std::env::var("VTUBER_DEFAULT_MODEL").unwrap_or_else(|_| "openab".into()); + Some(Self { + auth_key, + default_model, + }) + } +} + +#[derive(Deserialize)] +pub struct ChatMessage { + #[serde(default)] + pub role: String, + #[serde(default)] + pub content: String, +} + +#[derive(Deserialize)] +pub struct ChatRequest { + #[serde(default)] + pub model: Option, + #[serde(default)] + pub messages: Vec, + #[serde(default)] + pub stream: Option, +} + +fn flatten_messages(messages: &[ChatMessage]) -> String { + let mut out = String::new(); + for m in messages { + if m.content.trim().is_empty() { + continue; + } + let label = match m.role.as_str() { + "system" => "System", + "assistant" => "Assistant", + "user" | "" => "User", + other => other, + }; + if !out.is_empty() { + out.push_str("\n\n"); + } + out.push_str(label); + out.push_str(": "); + out.push_str(&m.content); + } + out +} + +fn delta_suffix(full: &str, sent_len: usize) -> (String, usize) { + match full.get(sent_len..) { + Some(suffix) => (suffix.to_string(), full.len()), + None => (full.to_string(), full.len()), + } +} + +pub async fn chat_completions( + State(state): State>, + headers: HeaderMap, + Json(req): Json, +) -> axum::response::Response { + let Some(ref cfg) = state.vtuber else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + "vtuber adapter not configured", + ) + .into_response(); + }; + + if let Some(expected) = cfg.auth_key.as_ref() { + let provided = headers + .get(AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.strip_prefix("Bearer ")); + if provided != Some(expected.as_str()) { + return (StatusCode::UNAUTHORIZED, "invalid api key").into_response(); + } + } + + if req.stream != Some(true) { + return ( + StatusCode::BAD_REQUEST, + "only streaming mode is supported; set stream: true", + ) + .into_response(); + } + + let prompt = flatten_messages(&req.messages); + if prompt.trim().is_empty() { + return ( + StatusCode::BAD_REQUEST, + "messages must contain non-empty content", + ) + .into_response(); + } + let model = req + .model + .clone() + .unwrap_or_else(|| cfg.default_model.clone()); + + let channel_id = format!("vtb_{}", Uuid::new_v4()); + let (tx, rx) = mpsc::unbounded_channel::(); + state + .vtuber_pending + .lock() + .await + .insert(channel_id.clone(), tx); + + let event = GatewayEvent::new( + "vtuber", + ChannelInfo { + id: channel_id.clone(), + channel_type: "dm".into(), + thread_id: None, + }, + SenderInfo { + id: "vtuber".into(), + name: "vtuber".into(), + display_name: "VTuber".into(), + is_bot: false, + }, + &prompt, + &format!("vtbmsg_{}", Uuid::new_v4()), + Vec::new(), + ); + match serde_json::to_string(&event) { + Ok(json) => { + let _ = state.event_tx.send(json); + } + Err(e) => { + state.vtuber_pending.lock().await.remove(&channel_id); + warn!("vtuber: failed to serialize event: {e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response(); + } + } + info!(channel = %channel_id, "vtuber: chat request dispatched"); + + let stream = reply_stream( + rx, + model, + channel_id, + state.vtuber_pending.clone(), + reply_tail_idle_timeout(), + ); + Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response() +} + +fn chunk_event( + id: &str, + created: i64, + model: &str, + delta: serde_json::Value, + finish: Option<&str>, +) -> SseEvent { + let payload = json!({ + "id": id, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [{ "index": 0, "delta": delta, "finish_reason": finish }], + }); + SseEvent::default().data(payload.to_string()) +} + +struct StreamState { + rx: mpsc::UnboundedReceiver, + sent_len: usize, + phase: u8, + id: String, + created: i64, + model: String, + channel_id: String, + registry: ReplyRegistry, + seen_snapshot: bool, + tail_idle: Duration, +} + +fn reply_stream( + rx: mpsc::UnboundedReceiver, + model: String, + channel_id: String, + registry: ReplyRegistry, + tail_idle: Duration, +) -> impl Stream> { + let init = StreamState { + rx, + sent_len: 0, + phase: 0, + id: format!("chatcmpl-{}", Uuid::new_v4()), + created: chrono::Utc::now().timestamp(), + model, + channel_id, + registry, + seen_snapshot: false, + tail_idle, + }; + futures_util::stream::unfold(init, |mut s| async move { + loop { + match s.phase { + 0 => { + s.phase = 1; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({"role":"assistant"}), + None, + ); + return Some((Ok(ev), s)); + } + 1 => match tokio::time::timeout( + if s.seen_snapshot { + s.tail_idle + } else { + REPLY_FIRST_TIMEOUT + }, + s.rx.recv(), + ) + .await + { + Ok(Some(ReplyChunk::Snapshot(full))) => { + let (delta, new_len) = delta_suffix(&full, s.sent_len); + if delta.is_empty() { + continue; + } + s.sent_len = new_len; + s.seen_snapshot = true; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({"content": delta}), + None, + ); + return Some((Ok(ev), s)); + } + Ok(Some(ReplyChunk::Done)) | Ok(None) => { + s.phase = 2; + continue; + } + Err(_) => { + if s.seen_snapshot { + info!(channel = %s.channel_id, "vtuber: reply stream idle, closing"); + } else { + warn!(channel = %s.channel_id, "vtuber: reply timed out"); + } + s.phase = 2; + continue; + } + }, + 2 => { + s.phase = 3; + let ev = chunk_event(&s.id, s.created, &s.model, json!({}), Some("stop")); + return Some((Ok(ev), s)); + } + 3 => { + s.phase = 4; + s.registry.lock().await.remove(&s.channel_id); + return Some((Ok(SseEvent::default().data("[DONE]")), s)); + } + _ => return None, + } + } + }) +} + +pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { + let key = reply.channel.id.as_str(); + let full = reply.content.text.clone(); + if full == "…" || full == "draft" { + return; + } + let is_pure_tool_status = parse_pure_tool_status(&full).is_some(); + + let mut map = registry.lock().await; + let Some(tx) = map.get(key) else { + return; + }; + + match reply.command.as_deref() { + Some("edit_message") => { + if is_pure_tool_status { + return; + } + if tx.send(ReplyChunk::Snapshot(full)).is_err() { + map.remove(key); + } + } + None => { + if !is_pure_tool_status { + let _ = tx.send(ReplyChunk::Snapshot(full)); + } + let _ = tx.send(ReplyChunk::Done); + map.remove(key); + } + _ => {} + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + // ----------------------------------------------------------------------- + // Helper: spin up gateway, return (addr, oab_ws_url, vtuber_ws_url) + // ----------------------------------------------------------------------- + async fn start_gateway() -> (String, String, String) { + let (event_tx, _) = tokio::sync::broadcast::channel::(256); + let ws_clients = new_ws_clients(); + let state = Arc::new(crate::AppState { + telegram_bot_token: None, + telegram_secret_token: None, + telegram_rich_messages: true, + line_channel_secret: None, + line_access_token: None, + teams: None, + teams_service_urls: Mutex::new(HashMap::new()), + feishu: None, + google_chat: None, + wecom: None, + telegram_trusted_source_only: false, + vtuber: Some(VtuberConfig { + auth_key: Some("test-key".into()), + default_model: "openab".into(), + }), + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + vtuber_ws_clients: Some(ws_clients), + ws_token: Some("oab-token".into()), + event_tx, + reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), + line_webhook_semaphore: Arc::new(tokio::sync::Semaphore::new(8)), + client: reqwest::Client::new(), + }); + + let app = axum::Router::new() + .route("/ws", axum::routing::get(crate::ws_handler)) + .route("/v1/vtuber/ws", axum::routing::get(ws_upgrade)) + .route( + "/v1/chat/completions", + axum::routing::post(chat_completions), + ) + .with_state(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }); + + let oab_url = format!("ws://{}/ws?token=oab-token", addr); + let vtb_url = format!("ws://{}/v1/vtuber/ws?token=test-key", addr); + (addr, oab_url, vtb_url) + } + + // ----------------------------------------------------------------------- + // Integration: OAB sends add_reaction → vtuber WS receives agent_state + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_reaction_to_agent_state() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + // Connect vtuber WS client + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + + // Connect OAB client and send a vtuber add_reaction reply + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + // Receive event on vtuber WS + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state"); + assert_eq!(event["state"], "thinking"); + assert_eq!(event["session_id"], "ch_test"); + } + + // ----------------------------------------------------------------------- + // Integration: OAB sends edit_message with [emotion] → vtuber WS gets it + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_emotion_from_edit_message() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[happy] Hello world!"}, + "command": "edit_message", + "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "happy"); + assert_eq!(event["intensity"], 1.0); + } + + // ----------------------------------------------------------------------- + // Integration: full agent lifecycle (queued → thinking → working → done → idle) + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_full_lifecycle() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + // Lifecycle: 👀 → 🤔 → 👨‍💻 → 🆗 → send_message + let sequence = [ + ("👀", "add_reaction", "thinking"), + ("🤔", "add_reaction", "thinking"), + ("👨\u{200d}💻", "add_reaction", "working"), + ("🆗", "add_reaction", "attention"), + ]; + + for (emoji, cmd, expected_state) in &sequence { + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_lifecycle"}, + "content": {"type": "text", "text": emoji}, + "command": cmd + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state", "for emoji {emoji}"); + assert_eq!(event["state"], *expected_state, "for emoji {emoji}"); + } + + // Final send_message → idle + let final_reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_lifecycle"}, + "content": {"type": "text", "text": "All done!"} + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + final_reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state"); + assert_eq!(event["state"], "idle"); + } + + // ----------------------------------------------------------------------- + // Integration: subscribe filtering + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_subscribe_filters_events() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + // Subscribe only to emotion events + vtb_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"subscribe","events":["emotion"]}"#.into(), + )) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Send add_reaction (agent_state) — should be filtered out + let reaction = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reaction.to_string(), + )) + .await + .unwrap(); + + // Send edit_message with [emotion] — should pass through + let edit = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[joy] yay"}, + "command": "edit_message", "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + edit.to_string(), + )) + .await + .unwrap(); + + // Should receive emotion, NOT agent_state + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "joy"); + } + + // ----------------------------------------------------------------------- + // Regression: subscription updates stay attached to the same client after + // another client disconnects. + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_subscribe_after_disconnect_uses_same_client() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut first_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (_middle_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut target_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + first_ws.close(None).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + target_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"subscribe","events":["emotion"]}"#.into(), + )) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let reaction = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reaction.to_string(), + )) + .await + .unwrap(); + + let filtered = + tokio::time::timeout(std::time::Duration::from_millis(300), target_ws.next()).await; + assert!( + filtered.is_err(), + "target client received agent_state despite emotion-only subscription" + ); + + let edit = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[joy] yay"}, + "command": "edit_message", "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + edit.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), target_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "joy"); + } + + // ----------------------------------------------------------------------- + // Integration: ping → pong + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_ping_pong() { + let (_addr, _oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + vtb_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"ping"}"#.into(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "pong"); + assert!(event["ts"].is_number()); + } + + // ----------------------------------------------------------------------- + // Integration: bad auth → 401 + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_bad_auth_rejected() { + let (addr, _, _) = start_gateway().await; + let bad_url = format!("ws://{}/v1/vtuber/ws?token=wrong", addr); + let result = tokio_tungstenite::connect_async(&bad_url).await; + assert!( + result.is_err() || { + let (_, resp) = result.unwrap(); + resp.status() == reqwest::StatusCode::UNAUTHORIZED + } + ); + } + + // ----------------------------------------------------------------------- + // Integration: Tier-1 explicitly requires streaming mode. + // ----------------------------------------------------------------------- + #[tokio::test] + async fn chat_completions_rejects_non_streaming_requests() { + let (addr, _, _) = start_gateway().await; + let url = format!("http://{}/v1/chat/completions", addr); + let client = reqwest::Client::new(); + let bodies = [ + serde_json::json!({ + "model": "openab", + "messages": [{"role": "user", "content": "hello"}] + }), + serde_json::json!({ + "model": "openab", + "stream": false, + "messages": [{"role": "user", "content": "hello"}] + }), + ]; + + for body in bodies { + let resp = client + .post(&url) + .bearer_auth("test-key") + .json(&body) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let text = resp.text().await.unwrap(); + assert!(text.contains("only streaming mode is supported")); + } + } + + // ----------------------------------------------------------------------- + // Unit tests + // ----------------------------------------------------------------------- + + #[test] + fn reaction_mapping_covers_all_oab_emojis() { + assert_eq!(reaction_to_state("👀").unwrap().state, "thinking"); + assert_eq!(reaction_to_state("🤔").unwrap().state, "thinking"); + assert_eq!(reaction_to_state("🔥").unwrap().state, "working"); + assert_eq!(reaction_to_state("👨\u{200d}💻").unwrap().state, "working"); + assert_eq!( + reaction_to_state("👨\u{200d}💻").unwrap().tool_class, + Some("coding") + ); + assert_eq!(reaction_to_state("⚡").unwrap().state, "working"); + assert_eq!(reaction_to_state("⚡").unwrap().tool_class, Some("web")); + assert_eq!(reaction_to_state("🆗").unwrap().state, "attention"); + assert_eq!(reaction_to_state("😱").unwrap().state, "error"); + assert_eq!(reaction_to_state("🥱").unwrap().state, "error"); + assert_eq!(reaction_to_state("😨").unwrap().state, "error"); + assert!(reaction_to_state("😊").is_none()); // mood face, not a state + } + + #[test] + fn extract_emotion_tags_basic() { + assert_eq!(extract_emotion_tags("Hello [happy] world"), vec!["happy"]); + assert_eq!( + extract_emotion_tags("[excited] Hi [joy] there"), + vec!["excited", "joy"] + ); + assert!(extract_emotion_tags("no tags here").is_empty()); + assert!(extract_emotion_tags("[with spaces]").is_empty()); + assert!(extract_emotion_tags("[]").is_empty()); + } + + #[test] + fn derive_events_add_reaction() { + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "🤔".into(), + attachments: vec![], + }, + command: Some("add_reaction".into()), + request_id: None, + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::AgentState { + state, session_id, .. + } => { + assert_eq!(*state, "thinking"); + assert_eq!(session_id, "ch_1"); + } + _ => panic!("expected AgentState"), + } + } + + #[test] + fn derive_events_edit_message_with_emotion() { + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "[excited] Hello!".into(), + attachments: vec![], + }, + command: Some("edit_message".into()), + request_id: Some("req_1".into()), + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::Emotion { tag, intensity, .. } => { + assert_eq!(tag, "excited"); + assert_eq!(*intensity, 1.0); + } + _ => panic!("expected Emotion"), + } + } + + #[test] + fn derive_events_send_message_idle() { + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "Done!".into(), + attachments: vec![], + }, + command: None, + request_id: None, + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::AgentState { state, .. } => assert_eq!(*state, "idle"), + _ => panic!("expected AgentState idle"), + } + } + + #[tokio::test] + async fn reply_stream_finishes_after_snapshot_idle() { + let (tx, rx) = mpsc::unbounded_channel::(); + let registry: ReplyRegistry = Arc::new(Mutex::new(HashMap::new())); + registry.lock().await.insert("ch_idle".into(), tx.clone()); + + let mut stream = Box::pin(reply_stream( + rx, + "openab".into(), + "ch_idle".into(), + registry.clone(), + Duration::from_millis(10), + )); + + assert!( + stream.next().await.is_some(), + "role chunk should be emitted first" + ); + tx.send(ReplyChunk::Snapshot("hello".into())).unwrap(); + assert!( + stream.next().await.is_some(), + "content chunk should be emitted" + ); + + let finish = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!(finish.is_ok(), "finish chunk should arrive after tail idle"); + assert!(finish.unwrap().is_some()); + + let done = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!(done.is_ok(), "[DONE] should arrive after finish chunk"); + assert!(done.unwrap().is_some()); + + assert!( + !registry.lock().await.contains_key("ch_idle"), + "stream completion should remove pending registry entry" + ); + } +} diff --git a/crates/openab-gateway/src/lib.rs b/crates/openab-gateway/src/lib.rs index 92317175b..dae7a726d 100644 --- a/crates/openab-gateway/src/lib.rs +++ b/crates/openab-gateway/src/lib.rs @@ -40,6 +40,14 @@ pub struct AppState { pub google_chat: Option, #[cfg(feature = "wecom")] pub wecom: Option, + #[cfg(feature = "vtuber")] + pub vtuber: Option, + /// In-flight OpenAI-compatible requests awaiting their streamed reply, + /// keyed by the per-request `channel.id`. See `adapters::vtuber`. + #[cfg(feature = "vtuber")] + pub vtuber_pending: adapters::vtuber::ReplyRegistry, + #[cfg(feature = "vtuber")] + pub vtuber_ws_clients: Option, pub ws_token: Option, pub event_tx: broadcast::Sender, pub reply_token_cache: ReplyTokenCache, @@ -75,6 +83,12 @@ impl AppState { google_chat: None, #[cfg(feature = "wecom")] wecom: None, + #[cfg(feature = "vtuber")] + vtuber: None, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: None, ws_token: None, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), @@ -156,6 +170,10 @@ impl AppState { let wecom = adapters::wecom::WecomConfig::from_env() .map(adapters::wecom::WecomAdapter::new); + // VTuber (OpenAI-compatible) + #[cfg(feature = "vtuber")] + let vtuber = adapters::vtuber::VtuberConfig::from_env(); + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -177,6 +195,12 @@ impl AppState { google_chat, #[cfg(feature = "wecom")] wecom, + #[cfg(feature = "vtuber")] + vtuber, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: Some(adapters::vtuber::new_ws_clients()), ws_token, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), @@ -390,6 +414,18 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { #[cfg(not(feature = "wecom"))] let wecom: Option<()> = None; + // VTuber (OpenAI-compatible) adapter + #[cfg(feature = "vtuber")] + let vtuber = adapters::vtuber::VtuberConfig::from_env(); + #[cfg(feature = "vtuber")] + if vtuber.is_some() { + let path = std::env::var("VTUBER_PATH").unwrap_or_else(|_| "/v1/chat/completions".into()); + info!("vtuber adapter enabled (Tier-1 at {path}, Tier-2 WS at /v1/vtuber/ws)"); + app = app + .route(&path, post(adapters::vtuber::chat_completions)) + .route("/v1/vtuber/ws", get(adapters::vtuber::ws_upgrade)); + } + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -413,6 +449,12 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { google_chat, #[cfg(feature = "wecom")] wecom, + #[cfg(feature = "vtuber")] + vtuber, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: Some(adapters::vtuber::new_ws_clients()), ws_token, event_tx, reply_token_cache, @@ -420,6 +462,13 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { client, }); + #[cfg(feature = "vtuber")] + if state.vtuber.is_some() { + if let Some(ref clients) = state.vtuber_ws_clients { + adapters::vtuber::spawn_ambient_task(clients.clone()); + } + } + // Background: sweep expired reply tokens { let cache_state = state.clone(); @@ -620,6 +669,18 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: warn!("reply for wecom but adapter not configured"); } } + #[cfg(feature = "vtuber")] + "vtuber" => { + adapters::vtuber::handle_reply( + &reply, + &state_for_recv.vtuber_pending, + ) + .await; + if let Some(ref ws_clients) = state_for_recv.vtuber_ws_clients { + let events = adapters::vtuber::derive_events(&reply); + adapters::vtuber::broadcast(ws_clients, &events).await; + } + } other => warn!(platform = other, "unknown reply platform"), } } diff --git a/docs/config-reference.md b/docs/config-reference.md index 2f5509a14..5eb289e6f 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -692,6 +692,7 @@ Each platform is auto-enabled when its env vars are present: | Google Chat | `GOOGLE_CHAT_ENABLED=true` | `GOOGLE_CHAT_SA_KEY_JSON`, `GOOGLE_CHAT_SA_KEY_FILE`, `GOOGLE_CHAT_ACCESS_TOKEN`, `GOOGLE_CHAT_AUDIENCE`, `GOOGLE_CHAT_WEBHOOK_PATH` | | WeCom | `WECOM_CORP_ID` | _(see wecom config)_ | | Teams | `TEAMS_APP_ID` | `TEAMS_WEBHOOK_PATH` | +| VTuber | `VTUBER_ENABLED=true` | `VTUBER_AUTH_KEY`, `VTUBER_DEFAULT_MODEL`, `VTUBER_PATH`, `VTUBER_REPLY_TAIL_IDLE_MS`, `VTUBER_AMBIENT_*` | > ⚠️ **Production checklist**: Set `GATEWAY_ALLOW_ALL_CHANNELS=false` and `GATEWAY_ALLOW_ALL_USERS=false` with explicit allowlists. The defaults are permissive for development convenience. > diff --git a/docs/vtuber.md b/docs/vtuber.md new file mode 100644 index 000000000..e8f75cc23 --- /dev/null +++ b/docs/vtuber.md @@ -0,0 +1,172 @@ +# VTuber (OpenAI-compatible) Setup + +Expose an **OpenAI-compatible `/v1/chat/completions` (SSE)** endpoint from the +unified OpenAB binary, so any character "skin" that already speaks OpenAI chat +completions (AniCompanion, Open-LLM-VTuber, ChatVRM, ...) gets a real agent: +tool use, code, MCP, memory, and the same configured ACP backend. + +The skin connects directly to the unified OpenAB HTTP listener in the same +OpenAB process: + +- Tier-1 chat: `POST /v1/chat/completions` streams SSE responses. +- Tier-2 UI events: `GET /v1/vtuber/ws` is optional and uses the same process. +- Agent work: both routes dispatch to the configured ACP agent in OpenAB. + +Unlike chat-platform adapters (LINE, Telegram, ...), this is **not a webhook**: +the skin opens an HTTP request and the reply streams back on that same +connection. The optional Tier-2 WebSocket is only for side-channel UI events +such as agent state, emotions, tool status, and ambient notifications. + +## Prerequisites + +- An OpenAB image/binary built with the `unified` feature. +- An ACP agent configured in the same OpenAB process. +- A public URL, tunnel, or localhost endpoint reachable by the skin. +- A skin that supports an OpenAI-compatible backend, for example + AniCompanion -> Settings -> Agent backend -> OpenAI-compatible. + +## 1. Enable the VTuber Adapter + +Set these environment variables on the OpenAB process: + +```bash +VTUBER_ENABLED=true +VTUBER_AUTH_KEY="$(openssl rand -hex 32)" +VTUBER_DEFAULT_MODEL=openab +GATEWAY_LISTEN=0.0.0.0:8080 +``` + +Example Docker run with a Kiro-backed OpenAB image: + +```bash +docker run -d --name openab-vtuber \ + -e VTUBER_ENABLED=true \ + -e VTUBER_AUTH_KEY="$VTUBER_AUTH_KEY" \ + -e VTUBER_DEFAULT_MODEL=openab \ + -e KIRO_API_KEY="$KIRO_API_KEY" \ + -p 8080:8080 \ + ghcr.io/openabdev/openab:beta-kiro +``` + +For Kubernetes or Zeabur, put the same environment variables on the OpenAB +service. No companion container or adapter config block is needed in unified +mode. + +## 2. Configure the Agent + +Use the normal OpenAB agent configuration. The VTuber adapter submits incoming +skin messages directly to OpenAB's in-process dispatcher. + +```toml +[agent] +command = "kiro-cli" +args = ["acp", "--trust-all-tools"] +``` + +Streaming is handled by the VTuber adapter's SSE endpoint; it does not require +separate streaming settings. + +## 3. Point the Skin at OpenAB + +In the skin's OpenAI-compatible backend settings: + +- **Endpoint / Base URL**: `https://your-openab-host` (the adapter serves + `/v1/chat/completions`) +- **API Key**: the `VTUBER_AUTH_KEY` value (sent as `Authorization: Bearer `) +- **Model**: anything; OpenAB routes to the configured ACP agent and echoes the + model name back in OpenAI-compatible chunks + +If the skin supports the Tier-2 side channel, connect it to the same base URL +at `/v1/vtuber/ws` with the same bearer key. + +## 4. Test + +```bash +curl -N https://your-openab-host/v1/chat/completions \ + -H "Authorization: Bearer $VTUBER_AUTH_KEY" \ + -H "Content-Type: application/json" \ + -d '{"model":"openab","stream":true, + "messages":[{"role":"user","content":"hi 小光"}]}' +``` + +You should see `data: {...chat.completion.chunk...}` lines ending with +`data: [DONE]`. + +## Tier-2 WebSocket + +`GET /v1/vtuber/ws` is optional. It pushes structured UI events: + +- `agent_state` +- `emotion` +- `tool_status` +- `notification` + +Clients can send `{"type":"subscribe","events":[...]}` to filter event types +and `{"type":"ping"}` for keepalive. + +Ambient notifications are opt-in: + +```bash +VTUBER_AMBIENT_ENABLED=true +VTUBER_AMBIENT_INTERVAL_SECS=1800 +VTUBER_AMBIENT_URGENCY=normal +VTUBER_AMBIENT_PROMPT="Ambient check-in prompt..." +``` + +## Emotion Tags + +Inline `[emotion]` tags (for example AniCompanion's `[happy]`, `[sad]`, +`[curious]`, ...) are the skin's own convention. The skin's persona/system +prompt instructs the agent to emit them, and the skin parses and strips them +before TTS. Tier-1 passes them through verbatim; Tier-2 also extracts recognized +tags as `emotion` events for clients that subscribe to the side channel. + +## Environment Variables + +| Variable | Required | Description | +|---|---|---| +| `VTUBER_ENABLED` | Yes | `true`/`1` to enable the adapter in the unified binary | +| `VTUBER_AUTH_KEY` | Recommended | Bearer key required on Tier-1 and Tier-2 requests. If unset, the endpoint is unauthenticated and logs a warning | +| `VTUBER_DEFAULT_MODEL` | No | Model name echoed back when the request omits one (default `openab`) | +| `VTUBER_PATH` | No | Tier-1 route path (default `/v1/chat/completions`) | +| `VTUBER_REPLY_TAIL_IDLE_MS` | No | Tail-idle close delay after the first content snapshot (default `1500`) | +| `VTUBER_AMBIENT_ENABLED` | No | `true`/`1` to enable Tier-2 ambient notifications | +| `VTUBER_AMBIENT_INTERVAL_SECS` | No | Ambient notification interval, minimum 60 seconds (default `1800`) | +| `VTUBER_AMBIENT_URGENCY` | No | `low`, `normal`, or `high` (default `normal`) | +| `VTUBER_AMBIENT_PROMPT` | No | Prompt text sent to Tier-2 clients as a `notification` event | +| `GATEWAY_LISTEN` | No | Bind address for the unified HTTP listener (default `0.0.0.0:8080`) | + +## Notes & Limitations + +- **One session per request.** Each call mints a fresh agent session; the full + conversation history must be carried in `messages[]`, which OpenAI clients + already do. +- **No agent output => no chat output.** If the configured ACP agent cannot + answer, the SSE request eventually closes after the reply timeout. +- **Tier-2 is optional.** Skins that only support OpenAI chat completions still + get full Tier-1 chat. Tier-2 adds richer UI state when the skin supports it. +- **Tags are not motion.** Mapping `[emotion]` or Tier-2 `emotion` events to VRM + expressions, Live2D parameters, or VTube Studio actions is the skin's job. + +## Troubleshooting + +**No response / stream hangs then closes:** +- Confirm `VTUBER_ENABLED=true` is set on the OpenAB process. +- Confirm the ACP agent is configured and authenticated. +- Check OpenAB logs for `unified: vtuber adapter enabled`. + +**`401 invalid api key`:** +- The `Authorization: Bearer ` value must match `VTUBER_AUTH_KEY`. + +**Reply arrives all at once instead of streaming:** +- Confirm the skin is calling `/v1/chat/completions` with `stream: true`. +- Confirm no proxy in front of OpenAB buffers SSE responses. + +**Tier-2 events do not arrive:** +- Confirm the client connects to `/v1/vtuber/ws` on the same OpenAB host. +- Confirm it uses the same bearer key as Tier-1. +- Check OpenAB logs for `vtuber WS client connected`. + +## References + +- [RFC: VTuber adapter](https://github.com/openabdev/openab/issues/1233) diff --git a/src/main.rs b/src/main.rs index ecc5ce2c0..51a5e3baa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,7 +123,14 @@ fn has_unified_platform_env() -> bool { || (cfg!(feature = "feishu") && std::env::var("FEISHU_APP_ID").is_ok()) || (cfg!(feature = "wecom") && std::env::var("WECOM_CORP_ID").is_ok()) || (cfg!(feature = "teams") && std::env::var("TEAMS_APP_ID").is_ok()) - || (cfg!(feature = "googlechat") && std::env::var("GOOGLE_CHAT_ENABLED").map(|v| v == "true" || v == "1").unwrap_or(false)) + || (cfg!(feature = "googlechat") + && std::env::var("GOOGLE_CHAT_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false)) + || (cfg!(feature = "vtuber") + && std::env::var("VTUBER_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false)) } #[tokio::main] @@ -145,7 +152,11 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } #[cfg(feature = "agentcore")] - Commands::AgentcoreBridge { runtime_arn, region, command } => { + Commands::AgentcoreBridge { + runtime_arn, + region, + command, + } => { return acp::agentcore::run_bridge(&runtime_arn, ®ion, &command).await; } Commands::Set { key, value, thread } => { @@ -199,11 +210,13 @@ async fn main() -> anyhow::Result<()> { "config loaded" ); - if cfg.discord.is_none() && cfg.slack.is_none() && cfg.gateway.is_none() + if cfg.discord.is_none() + && cfg.slack.is_none() + && cfg.gateway.is_none() && !has_unified_platform_env() { anyhow::bail!( - "no adapter configured — add [discord], [slack], or [gateway] to config, or set platform env vars (TELEGRAM_BOT_TOKEN, etc.)" + "no adapter configured — add [discord], [slack], a legacy [gateway] config, or set unified platform env vars (TELEGRAM_BOT_TOKEN, VTUBER_ENABLED, etc.)" ); } @@ -495,11 +508,11 @@ async fn main() -> anyhow::Result<()> { feature = "teams", ))] let _unified_handle = { - use openab_core::gateway::{GatewayEventContext, process_gateway_event}; + use openab_core::gateway::{process_gateway_event, GatewayEventContext}; if has_unified_platform_env() { - let listen_addr = std::env::var("GATEWAY_LISTEN") - .unwrap_or_else(|_| "0.0.0.0:8080".into()); + let listen_addr = + std::env::var("GATEWAY_LISTEN").unwrap_or_else(|_| "0.0.0.0:8080".into()); // Create a dedicated dispatcher for unified gateway events let unified_dispatcher = Arc::new(dispatch::Dispatcher::with_idle_timeout( @@ -520,21 +533,27 @@ async fn main() -> anyhow::Result<()> { let gw_state = Arc::new(openab_gateway::AppState::from_env(event_tx.clone(), None)); // Build axum router with platform webhook routes - let mut app = axum::Router::new() - .route("/health", axum::routing::get(|| async { "ok" })); + let mut app = + axum::Router::new().route("/health", axum::routing::get(|| async { "ok" })); #[cfg(feature = "telegram")] if gw_state.telegram_bot_token.is_some() { let path = std::env::var("TELEGRAM_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/telegram".into()); info!(path = %path, "unified: telegram adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::telegram::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::telegram::webhook), + ); } #[cfg(feature = "line")] { info!("unified: line adapter enabled"); - app = app.route("/webhook/line", axum::routing::post(openab_gateway::adapters::line::webhook)); + app = app.route( + "/webhook/line", + axum::routing::post(openab_gateway::adapters::line::webhook), + ); } #[cfg(feature = "feishu")] @@ -542,23 +561,35 @@ async fn main() -> anyhow::Result<()> { let path = std::env::var("FEISHU_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/feishu".into()); info!(path = %path, "unified: feishu adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::feishu::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::feishu::webhook), + ); } #[cfg(feature = "wecom")] if let Some(ref w) = gw_state.wecom { info!(path = %w.config.webhook_path, "unified: wecom adapter enabled"); app = app - .route(&w.config.webhook_path, axum::routing::get(openab_gateway::adapters::wecom::verify)) - .route(&w.config.webhook_path, axum::routing::post(openab_gateway::adapters::wecom::webhook)); + .route( + &w.config.webhook_path, + axum::routing::get(openab_gateway::adapters::wecom::verify), + ) + .route( + &w.config.webhook_path, + axum::routing::post(openab_gateway::adapters::wecom::webhook), + ); } #[cfg(feature = "teams")] if gw_state.teams.is_some() { - let path = std::env::var("TEAMS_WEBHOOK_PATH") - .unwrap_or_else(|_| "/webhook/teams".into()); + let path = + std::env::var("TEAMS_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/teams".into()); info!(path = %path, "unified: teams adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::teams::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::teams::webhook), + ); } #[cfg(feature = "googlechat")] @@ -566,17 +597,39 @@ async fn main() -> anyhow::Result<()> { let path = std::env::var("GOOGLE_CHAT_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/googlechat".into()); info!(path = %path, "unified: googlechat adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::googlechat::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::googlechat::webhook), + ); + } + + #[cfg(feature = "vtuber")] + if gw_state.vtuber.is_some() { + let path = + std::env::var("VTUBER_PATH").unwrap_or_else(|_| "/v1/chat/completions".into()); + info!(path = %path, "unified: vtuber adapter enabled"); + app = app + .route( + &path, + axum::routing::post(openab_gateway::adapters::vtuber::chat_completions), + ) + .route( + "/v1/vtuber/ws", + axum::routing::get(openab_gateway::adapters::vtuber::ws_upgrade), + ); + if let Some(ref clients) = gw_state.vtuber_ws_clients { + openab_gateway::adapters::vtuber::spawn_ambient_task(clients.clone()); + } } let app = app.with_state(gw_state.clone()); // Bridge task: receive events from adapters via event_tx, dispatch to core let unified_adapter: Arc = Arc::new( - unified_adapter::UnifiedGatewayAdapter::new(gw_state.clone()) + unified_adapter::UnifiedGatewayAdapter::new(gw_state.clone()), ); - // Read security gating from env (mirrors [gateway] config section) + // Read security gating from env for unified platform adapters. let gw_allow_all_channels = std::env::var("GATEWAY_ALLOW_ALL_CHANNELS") .map(|v| v != "0" && !v.eq_ignore_ascii_case("false")) .unwrap_or(true); @@ -774,14 +827,17 @@ async fn main() -> anyhow::Result<()> { let reminder_store = remind::ReminderStore::load(reminder_path); // Construct ambient dispatcher if enabled and channels configured. - let ambient_dispatcher = if cfg.ambient.enabled && !cfg.ambient.discord.channels.is_empty() { + let ambient_dispatcher = if cfg.ambient.enabled && !cfg.ambient.discord.channels.is_empty() + { info!( channels = ?cfg.ambient.discord.channels, flush_interval = cfg.ambient.flush_interval_seconds, flush_max_messages = cfg.ambient.flush_max_messages, "ambient mode enabled" ); - Some(Arc::new(openab_core::ambient::AmbientDispatcher::new(cfg.ambient.clone()))) + Some(Arc::new(openab_core::ambient::AmbientDispatcher::new( + cfg.ambient.clone(), + ))) } else { None }; @@ -981,6 +1037,7 @@ mod tests { std::env::remove_var("WECOM_CORP_ID"); std::env::remove_var("TEAMS_APP_ID"); std::env::remove_var("GOOGLE_CHAT_ENABLED"); + std::env::remove_var("VTUBER_ENABLED"); } // Case 1: no env vars → false @@ -1002,6 +1059,16 @@ mod tests { std::env::set_var("TELEGRAM_BOT_TOKEN", "test-token"); assert_eq!(has_unified_platform_env(), cfg!(feature = "telegram")); + // Case 5: VTUBER_ENABLED=true → true only if feature compiled + clear_all(); + std::env::set_var("VTUBER_ENABLED", "true"); + assert_eq!(has_unified_platform_env(), cfg!(feature = "vtuber")); + + // Case 6: VTUBER_ENABLED=yes (invalid) → false + clear_all(); + std::env::set_var("VTUBER_ENABLED", "yes"); + assert!(!has_unified_platform_env()); + // Cleanup clear_all(); } diff --git a/src/unified_adapter.rs b/src/unified_adapter.rs index 123cce2d1..8ebb79ed5 100644 --- a/src/unified_adapter.rs +++ b/src/unified_adapter.rs @@ -89,8 +89,23 @@ impl UnifiedGatewayAdapter { .await; } } + #[cfg(feature = "vtuber")] + "vtuber" => { + openab_gateway::adapters::vtuber::handle_reply( + reply, + &self.gw_state.vtuber_pending, + ) + .await; + if let Some(ref clients) = self.gw_state.vtuber_ws_clients { + let events = openab_gateway::adapters::vtuber::derive_events(reply); + openab_gateway::adapters::vtuber::broadcast(clients, &events).await; + } + } other => { - tracing::warn!(platform = other, "unified adapter: unknown platform, cannot route reply"); + tracing::warn!( + platform = other, + "unified adapter: unknown platform, cannot route reply" + ); } } } @@ -138,8 +153,13 @@ impl ChatAdapter for UnifiedGatewayAdapter { self.dispatch_reply(&reply).await; Ok(MessageRef { channel: channel.clone(), - message_id: format!("unified_{:x}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos()), + message_id: format!( + "unified_{:x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + ), }) } @@ -195,8 +215,13 @@ impl ChatAdapter for UnifiedGatewayAdapter { self.dispatch_reply(&reply).await; Ok(MessageRef { channel: channel.clone(), - message_id: format!("unified_{:x}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos()), + message_id: format!( + "unified_{:x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + ), }) }