From 4ad775b7fe1661adb0a749e1f00a1debd1c6a2fb Mon Sep 17 00:00:00 2001 From: Can Yu Date: Sun, 28 Jun 2026 23:51:50 +0800 Subject: [PATCH 01/10] feat(gateway): add vtuber OpenAI-compatible adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose POST /v1/chat/completions (SSE) backed by the OAB agent, so any OpenAI-compatible character skin (AniCompanion, Open-LLM-VTuber, …) gets a real agent with zero client changes. messages[] is flattened into the agent prompt; the agent's streamed reply is re-emitted as OpenAI chat.completion.chunk deltas via a per-request channel.id registry drained by the /ws recv loop. Inline [emotion] tags pass through untouched. Tier 1 of RFC #1233. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01NXngQyvmJwQPNYsiRUNU2m --- crates/openab-gateway/Cargo.toml | 3 +- crates/openab-gateway/src/adapters/line.rs | 4 + crates/openab-gateway/src/adapters/mod.rs | 2 + crates/openab-gateway/src/adapters/teams.rs | 4 + crates/openab-gateway/src/adapters/vtuber.rs | 424 +++++++++++++++++++ crates/openab-gateway/src/lib.rs | 36 ++ docs/vtuber.md | 133 ++++++ 7 files changed, 605 insertions(+), 1 deletion(-) create mode 100644 crates/openab-gateway/src/adapters/vtuber.rs create mode 100644 docs/vtuber.md diff --git a/crates/openab-gateway/Cargo.toml b/crates/openab-gateway/Cargo.toml index 26ee00db9..c9f499f9c 100644 --- a/crates/openab-gateway/Cargo.toml +++ b/crates/openab-gateway/Cargo.toml @@ -35,10 +35,11 @@ urlencoding = "2" wiremock = "0.6" [features] -default = ["telegram", "line", "feishu", "googlechat", "wecom", "teams"] +default = ["telegram", "line", "feishu", "googlechat", "wecom", "teams", "vtuber"] telegram = [] line = [] feishu = [] googlechat = [] wecom = [] teams = [] +vtuber = [] diff --git a/crates/openab-gateway/src/adapters/line.rs b/crates/openab-gateway/src/adapters/line.rs index c0981f6c5..403f0c482 100644 --- a/crates/openab-gateway/src/adapters/line.rs +++ b/crates/openab-gateway/src/adapters/line.rs @@ -1157,6 +1157,10 @@ mod tests { feishu: None, google_chat: None, wecom: None, + #[cfg(feature = "vtuber")] + vtuber: None, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), ws_token: None, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), diff --git a/crates/openab-gateway/src/adapters/mod.rs b/crates/openab-gateway/src/adapters/mod.rs index f58f870a2..7c58c2135 100644 --- a/crates/openab-gateway/src/adapters/mod.rs +++ b/crates/openab-gateway/src/adapters/mod.rs @@ -12,3 +12,5 @@ pub mod googlechat; pub mod wecom; #[cfg(feature = "teams")] pub mod teams; +#[cfg(feature = "vtuber")] +pub mod vtuber; diff --git a/crates/openab-gateway/src/adapters/teams.rs b/crates/openab-gateway/src/adapters/teams.rs index 09ac09df8..6a1d232c7 100644 --- a/crates/openab-gateway/src/adapters/teams.rs +++ b/crates/openab-gateway/src/adapters/teams.rs @@ -662,6 +662,10 @@ mod tests { feishu: None, google_chat: None, wecom: None, + #[cfg(feature = "vtuber")] + vtuber: None, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), ws_token: None, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs new file mode 100644 index 000000000..9de819b14 --- /dev/null +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -0,0 +1,424 @@ +//! VTuber adapter — an OpenAI-compatible `/v1/chat/completions` (SSE) front door +//! backed by an OAB ACP agent. +//! +//! Any "skin" that already speaks OpenAI chat completions (AniCompanion, +//! Open-LLM-VTuber, …) points at this endpoint and gets a real agent with zero +//! client changes. Inline `[emotion]` tags emitted by the agent pass through the +//! stream untouched; the skin parses + maps them to its own motion system. +//! +//! The gateway has no embedded agent — it relays events to an OAB process over +//! `/ws` and receives replies asynchronously on a different task. So an in-flight +//! HTTP request parks an `mpsc` sender in [`ReplyRegistry`] keyed by a per-request +//! `channel.id`; [`handle_reply`] (called from the `/ws` recv loop) feeds reply +//! snapshots back into it, and the SSE stream re-emits them as OpenAI deltas. + +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::Duration; + +use axum::extract::State; +use axum::http::{header::AUTHORIZATION, HeaderMap, StatusCode}; +use axum::response::sse::{Event, KeepAlive, Sse}; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use futures_util::stream::Stream; +use serde::Deserialize; +use serde_json::json; +use tokio::sync::{mpsc, Mutex}; +use tracing::{info, warn}; +use uuid::Uuid; + +use crate::schema::{ChannelInfo, GatewayEvent, GatewayReply, SenderInfo}; + +/// A piece of a streamed reply, handed from the `/ws` recv task to the SSE response. +/// +/// The agent streams *full accumulated snapshots* (not deltas) every ~1.5s, plus a +/// final snapshot; the SSE side diffs them into OpenAI content deltas. +pub enum ReplyChunk { + Snapshot(String), + Done, +} + +/// Per-request `channel.id` → the SSE response awaiting that request's reply. +pub type ReplyRegistry = Arc>>>; + +/// Max wait for the next reply snapshot before closing the stream. Guards the case +/// where no OAB agent is connected to `/ws` (otherwise the request would hang). +const REPLY_IDLE_TIMEOUT: Duration = Duration::from_secs(180); + +// --- Config --- + +pub struct VtuberConfig { + /// Bearer key required on inbound requests. `None` = unauthenticated (warned). + pub auth_key: Option, + /// Model name echoed back in chunks when the request omits one. + pub default_model: String, +} + +impl VtuberConfig { + pub fn from_env() -> Option { + Self::from_reader(|k| std::env::var(k).ok()) + } + + /// Build from an arbitrary reader so tests avoid `env::set_var` races under + /// cargo's parallel runner (same pattern as the other adapters). + fn from_reader Option>(read: F) -> Option { + let enabled = read("VTUBER_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + if !enabled { + return None; + } + let auth_key = read("VTUBER_AUTH_KEY"); + if auth_key.is_none() { + warn!("VTUBER_AUTH_KEY not set — /v1/chat/completions is UNAUTHENTICATED (insecure)"); + } + let default_model = read("VTUBER_DEFAULT_MODEL").unwrap_or_else(|| "openab".into()); + info!( + default_model = %default_model, + authenticated = auth_key.is_some(), + "vtuber adapter configured" + ); + Some(Self { + auth_key, + default_model, + }) + } +} + +// --- OpenAI request DTOs (subset we honor) --- + +#[derive(Deserialize)] +pub struct ChatMessage { + #[serde(default)] + pub role: String, + #[serde(default)] + pub content: String, +} + +#[derive(Deserialize)] +pub struct ChatRequest { + #[serde(default)] + pub model: Option, + #[serde(default)] + pub messages: Vec, + #[serde(default)] + pub stream: Option, +} + +/// Flatten OpenAI `messages[]` (incl. the skin's own persona/system prompt) into a +/// single prompt string. Each request mints a fresh session, so the full history +/// carried in `messages` is the agent's only context. +fn flatten_messages(messages: &[ChatMessage]) -> String { + let mut out = String::new(); + for m in messages { + if m.content.trim().is_empty() { + continue; + } + let label = match m.role.as_str() { + "system" => "System", + "assistant" => "Assistant", + "user" | "" => "User", + other => other, + }; + if !out.is_empty() { + out.push_str("\n\n"); + } + out.push_str(label); + out.push_str(": "); + out.push_str(&m.content); + } + out +} + +/// Newly-appended suffix of `full` beyond `sent_len`, and the new sent length. +/// Snapshots grow monotonically; if the prefix ever changed (shouldn't happen) we +/// resend the whole string rather than panic on a non-char-boundary slice. +fn delta_suffix(full: &str, sent_len: usize) -> (String, usize) { + match full.get(sent_len..) { + Some(suffix) => (suffix.to_string(), full.len()), + None => (full.to_string(), full.len()), + } +} + +// --- HTTP handler --- + +pub async fn chat_completions( + State(state): State>, + headers: HeaderMap, + Json(req): Json, +) -> Response { + let Some(cfg) = state.vtuber.as_ref() else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + "vtuber adapter not configured", + ) + .into_response(); + }; + + // Bearer auth + if let Some(expected) = cfg.auth_key.as_ref() { + let provided = headers + .get(AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.strip_prefix("Bearer ")); + if provided != Some(expected.as_str()) { + return (StatusCode::UNAUTHORIZED, "invalid api key").into_response(); + } + } + + let prompt = flatten_messages(&req.messages); + if prompt.trim().is_empty() { + return ( + StatusCode::BAD_REQUEST, + "messages must contain non-empty content", + ) + .into_response(); + } + let model = req + .model + .clone() + .unwrap_or_else(|| cfg.default_model.clone()); + + // Per-request session id. The reply's `channel.id` echoes this, routing the + // agent's reply chunks back to this exact request. + let channel_id = format!("vtb_{}", Uuid::new_v4()); + let (tx, rx) = mpsc::unbounded_channel::(); + state + .vtuber_pending + .lock() + .await + .insert(channel_id.clone(), tx); + + let event = GatewayEvent::new( + "vtuber", + ChannelInfo { + id: channel_id.clone(), + channel_type: "dm".into(), + thread_id: None, + }, + SenderInfo { + id: "vtuber".into(), + name: "vtuber".into(), + display_name: "VTuber".into(), + is_bot: false, + }, + &prompt, + &format!("vtbmsg_{}", Uuid::new_v4()), + Vec::new(), + ); + match serde_json::to_string(&event) { + Ok(json) => { + let _ = state.event_tx.send(json); + } + Err(e) => { + state.vtuber_pending.lock().await.remove(&channel_id); + warn!("vtuber: failed to serialize event: {e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response(); + } + } + info!(channel = %channel_id, "vtuber: chat request dispatched"); + + let stream = reply_stream(rx, model, channel_id, state.vtuber_pending.clone()); + Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response() +} + +/// OpenAI `chat.completion.chunk` SSE event. +fn chunk_event( + id: &str, + created: i64, + model: &str, + delta: serde_json::Value, + finish: Option<&str>, +) -> Event { + let payload = json!({ + "id": id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ "index": 0, "delta": delta, "finish_reason": finish }], + }); + Event::default().data(payload.to_string()) +} + +struct StreamState { + rx: mpsc::UnboundedReceiver, + sent_len: usize, + phase: u8, // 0=role, 1=stream snapshots, 2=finish, 3=[DONE], 4=end + id: String, + created: i64, + model: String, + channel_id: String, + registry: ReplyRegistry, +} + +/// Turn the reply-chunk receiver into an OpenAI SSE stream: +/// `role` chunk → content deltas → `finish_reason:"stop"` → `data: [DONE]`. +fn reply_stream( + rx: mpsc::UnboundedReceiver, + model: String, + channel_id: String, + registry: ReplyRegistry, +) -> impl Stream> { + let init = StreamState { + rx, + sent_len: 0, + phase: 0, + id: format!("chatcmpl-{}", Uuid::new_v4()), + created: chrono::Utc::now().timestamp(), + model, + channel_id, + registry, + }; + + futures_util::stream::unfold(init, |mut s| async move { + loop { + match s.phase { + 0 => { + s.phase = 1; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({ "role": "assistant" }), + None, + ); + return Some((Ok(ev), s)); + } + 1 => match tokio::time::timeout(REPLY_IDLE_TIMEOUT, s.rx.recv()).await { + Ok(Some(ReplyChunk::Snapshot(full))) => { + let (delta, new_len) = delta_suffix(&full, s.sent_len); + if delta.is_empty() { + continue; + } + s.sent_len = new_len; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({ "content": delta }), + None, + ); + return Some((Ok(ev), s)); + } + Ok(Some(ReplyChunk::Done)) | Ok(None) => { + s.phase = 2; + continue; + } + Err(_) => { + warn!(channel = %s.channel_id, "vtuber: reply timed out (no agent connected?)"); + s.phase = 2; + continue; + } + }, + 2 => { + s.phase = 3; + let ev = chunk_event(&s.id, s.created, &s.model, json!({}), Some("stop")); + return Some((Ok(ev), s)); + } + 3 => { + s.phase = 4; + s.registry.lock().await.remove(&s.channel_id); + return Some((Ok(Event::default().data("[DONE]")), s)); + } + _ => return None, + } + } + }) +} + +// --- Reply ingestion (called from the `/ws` recv loop) --- + +/// Route a `GatewayReply` (platform = "vtuber") back to its waiting SSE response. +pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { + let key = reply.channel.id.as_str(); + let full = reply.content.text.clone(); + // Streaming-placeholder bodies are not real content. + if full == "…" || full == "draft" { + return; + } + + let mut map = registry.lock().await; + let Some(tx) = map.get(key) else { + return; + }; + + match reply.command.as_deref() { + // Partial: an in-progress edit carrying the full accumulated text. + Some("edit_message") => { + if tx.send(ReplyChunk::Snapshot(full)).is_err() { + map.remove(key); // SSE side hung up + } + } + // Final: the turn's last message (command = None). + None => { + let _ = tx.send(ReplyChunk::Snapshot(full)); + let _ = tx.send(ReplyChunk::Done); + map.remove(key); + } + // Other commands (reactions, create_topic, …) are irrelevant to the chat stream. + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_disabled_by_default() { + assert!(VtuberConfig::from_reader(|_| None).is_none()); + } + + #[test] + fn config_enabled_with_defaults() { + let cfg = VtuberConfig::from_reader(|k| match k { + "VTUBER_ENABLED" => Some("true".into()), + "VTUBER_AUTH_KEY" => Some("secret".into()), + _ => None, + }) + .expect("enabled"); + assert_eq!(cfg.auth_key.as_deref(), Some("secret")); + assert_eq!(cfg.default_model, "openab"); + } + + #[test] + fn flatten_labels_roles_and_skips_empty() { + let msgs = vec![ + ChatMessage { + role: "system".into(), + content: "be 小光".into(), + }, + ChatMessage { + role: "user".into(), + content: " ".into(), + }, + ChatMessage { + role: "user".into(), + content: "hi".into(), + }, + ]; + assert_eq!(flatten_messages(&msgs), "System: be 小光\n\nUser: hi"); + } + + #[test] + fn delta_suffix_emits_only_new_text() { + let (d1, n1) = delta_suffix("Hello", 0); + assert_eq!((d1.as_str(), n1), ("Hello", 5)); + let (d2, n2) = delta_suffix("Hello world", n1); + assert_eq!((d2.as_str(), n2), (" world", 11)); + let (d3, _) = delta_suffix("Hello world", n2); + assert_eq!(d3, ""); + } + + #[test] + fn delta_suffix_handles_multibyte() { + let (d1, n1) = delta_suffix("小光", 0); + assert_eq!(d1, "小光"); + let (d2, _) = delta_suffix("小光你好", n1); + assert_eq!(d2, "你好"); + } +} diff --git a/crates/openab-gateway/src/lib.rs b/crates/openab-gateway/src/lib.rs index 8a8e93a93..f11fbb481 100644 --- a/crates/openab-gateway/src/lib.rs +++ b/crates/openab-gateway/src/lib.rs @@ -39,6 +39,12 @@ pub struct AppState { pub google_chat: Option, #[cfg(feature = "wecom")] pub wecom: Option, + #[cfg(feature = "vtuber")] + pub vtuber: Option, + /// In-flight OpenAI-compatible requests awaiting their streamed reply, + /// keyed by the per-request `channel.id`. See `adapters::vtuber`. + #[cfg(feature = "vtuber")] + pub vtuber_pending: adapters::vtuber::ReplyRegistry, pub ws_token: Option, pub event_tx: broadcast::Sender, pub reply_token_cache: ReplyTokenCache, @@ -118,6 +124,10 @@ impl AppState { let wecom = adapters::wecom::WecomConfig::from_env() .map(adapters::wecom::WecomAdapter::new); + // VTuber (OpenAI-compatible) + #[cfg(feature = "vtuber")] + let vtuber = adapters::vtuber::VtuberConfig::from_env(); + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -138,6 +148,10 @@ impl AppState { google_chat, #[cfg(feature = "wecom")] wecom, + #[cfg(feature = "vtuber")] + vtuber, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), ws_token, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), @@ -351,6 +365,16 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { #[cfg(not(feature = "wecom"))] let wecom: Option<()> = None; + // VTuber (OpenAI-compatible) adapter + #[cfg(feature = "vtuber")] + let vtuber = adapters::vtuber::VtuberConfig::from_env(); + #[cfg(feature = "vtuber")] + if vtuber.is_some() { + let path = std::env::var("VTUBER_PATH").unwrap_or_else(|_| "/v1/chat/completions".into()); + info!(path = %path, "vtuber adapter enabled"); + app = app.route(&path, post(adapters::vtuber::chat_completions)); + } + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -371,6 +395,10 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { google_chat, #[cfg(feature = "wecom")] wecom, + #[cfg(feature = "vtuber")] + vtuber, + #[cfg(feature = "vtuber")] + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), ws_token, event_tx, reply_token_cache, @@ -578,6 +606,14 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: warn!("reply for wecom but adapter not configured"); } } + #[cfg(feature = "vtuber")] + "vtuber" => { + adapters::vtuber::handle_reply( + &reply, + &state_for_recv.vtuber_pending, + ) + .await; + } other => warn!(platform = other, "unknown reply platform"), } } diff --git a/docs/vtuber.md b/docs/vtuber.md new file mode 100644 index 000000000..466a9d5a9 --- /dev/null +++ b/docs/vtuber.md @@ -0,0 +1,133 @@ +# VTuber (OpenAI-compatible) Setup + +Expose an **OpenAI-compatible `/v1/chat/completions` (SSE)** endpoint backed by your +OAB agent, so any character "skin" that already speaks OpenAI chat completions +(AniCompanion, Open-LLM-VTuber, …) gets a real agent — tool use, code, MCP, memory — +with **zero client changes**. + +``` +Skin ──POST /v1/chat/completions (SSE)──▶ Gateway (:8080) ◀──WebSocket── OAB Pod + choices[].delta.content (incl. inline [emotion] tags) (OAB connects out) +``` + +Unlike the chat-platform adapters (LINE, Telegram, …) this is **not a webhook**: the +skin holds an HTTP request open and the reply streams back on the same connection. + +## Prerequisites + +- A running OAB instance (with kiro-cli or any ACP agent authenticated), **connected + to the gateway** — the gateway has no embedded model; without a connected agent the + endpoint returns nothing. +- The Custom Gateway deployed ([gateway/README.md](../gateway/README.md)). +- A skin that supports an OpenAI-compatible backend (e.g. AniCompanion → Settings → + Agent backend → OpenAI-compatible). + +## 1. Configure the Gateway + +```bash +# Docker +docker run -d --name openab-gateway \ + -e VTUBER_ENABLED="true" \ + -e VTUBER_AUTH_KEY="$(openssl rand -hex 32)" \ + -e VTUBER_DEFAULT_MODEL="openab" \ + -p 8080:8080 \ + ghcr.io/openabdev/openab-gateway:latest + +# Kubernetes +kubectl set env deployment/openab-gateway \ + VTUBER_ENABLED=true \ + VTUBER_AUTH_KEY= \ + VTUBER_DEFAULT_MODEL=openab +``` + +## 2. Configure OAB + +Point an OAB gateway connection at the `vtuber` platform. Streaming must use the +**draft** path (no thinking-placeholder) so the gateway can tell a partial edit from +the final message: + +```toml +[gateway] +url = "ws://openab-gateway:8080/ws" +platform = "vtuber" +streaming = true +streaming_placeholder = false # required: avoids the "…" placeholder ambiguity + +[agent] +command = "kiro-cli" +args = ["acp", "--trust-all-tools"] +working_dir = "/home/agent" +``` + +`streaming = false` also works — the whole reply arrives as one chunk + `[DONE]`. + +## 3. Point the Skin at the Gateway + +In the skin's OpenAI-compatible backend settings: + +- **Endpoint / Base URL**: `https://gw.yourdomain.com` (the adapter serves + `/v1/chat/completions`) +- **API Key**: the `VTUBER_AUTH_KEY` value (sent as `Authorization: Bearer `) +- **Model**: anything — the gateway routes to the connected agent regardless and echoes + the name back + +## 4. Test + +```bash +curl -N https://gw.yourdomain.com/v1/chat/completions \ + -H "Authorization: Bearer $VTUBER_AUTH_KEY" \ + -H "Content-Type: application/json" \ + -d '{"model":"openab","stream":true, + "messages":[{"role":"user","content":"hi 小光"}]}' +``` + +You should see `data: {...chat.completion.chunk...}` lines ending with `data: [DONE]`. + +## Emotion tags + +Inline `[emotion]` tags (e.g. AniCompanion's 16: `[happy] [sad] [curious] …`) are the +skin's own convention — the skin's persona/system prompt instructs the agent to emit +them, and the skin parses + strips them before TTS. **The adapter passes them through +the stream verbatim** and does no emotion handling itself. + +## Environment Variables + +| Variable | Required | Description | +|---|---|---| +| `VTUBER_ENABLED` | Yes | `true`/`1` to enable the adapter | +| `VTUBER_AUTH_KEY` | Recommended | Bearer key required on requests. If unset, the endpoint is **unauthenticated** (logged as insecure) | +| `VTUBER_DEFAULT_MODEL` | No | Model name echoed back when the request omits one (default `openab`) | +| `VTUBER_PATH` | No | Route path (default `/v1/chat/completions`) | + +## Notes & Limitations + +- **One session per request.** Each call mints a fresh agent session; the full + conversation history must be carried in `messages[]` (which OpenAI clients already do). +- **No agent connected ⇒ no output.** The request closes after a 180s idle timeout if no + reply arrives. Check that an OAB agent is connected to `/ws`. +- **Pull-only.** OpenAI chat completions cannot push agent-state animation cues or + proactive/ambient messages. Those are a planned Tier-2 WebSocket side-channel + (see the RFC), not part of this adapter. +- **Tags are not motion.** Mapping `[emotion]` → VRM expression / Live2D / VTube Studio + is the skin's job; the adapter stays motion-system-agnostic. + +## Troubleshooting + +**No response / stream hangs then closes:** +- Confirm an OAB agent is connected: check gateway logs for the `/ws` connection. +- Confirm `[gateway] platform = "vtuber"` in OAB config. + +**`401 invalid api key`:** +- The `Authorization: Bearer ` value must match `VTUBER_AUTH_KEY`. + +**Reply arrives all at once instead of streaming:** +- Set `streaming = true` and `streaming_placeholder = false` in OAB's `[gateway]` block. + +**Duplicated/garbled streaming text:** +- Ensure only one OAB agent is connected for the `vtuber` platform; multiple agents + reply on the same request id. + +## References + +- [ADR: Custom Gateway](adr/custom-gateway.md) +- [RFC: VTuber adapter](https://github.com/openabdev/openab/issues/1233) From acd3e38b68f5efb55d330ce3b938eb21255220a5 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 03:50:11 +0800 Subject: [PATCH 02/10] feat(gateway): add vtuber Tier-2 WebSocket event stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tier-2 adds GET /v1/vtuber/ws — a persistent WebSocket that pushes agent_state, emotion, and notification events derived from GatewayReply commands. VTuber skins connect once and receive real-time state updates (thinking/working/idle, tool usage, emotion tags) without polling. Co-Authored-By: Claude Opus 4.6 Claude-Session: https://claude.ai/code/session_01NXngQyvmJwQPNYsiRUNU2m --- crates/openab-gateway/src/adapters/line.rs | 2 + crates/openab-gateway/src/adapters/teams.rs | 2 + crates/openab-gateway/src/adapters/vtuber.rs | 418 ++++++++++++++++++- crates/openab-gateway/src/lib.rs | 16 +- 4 files changed, 421 insertions(+), 17 deletions(-) diff --git a/crates/openab-gateway/src/adapters/line.rs b/crates/openab-gateway/src/adapters/line.rs index 403f0c482..5c6ddfbcb 100644 --- a/crates/openab-gateway/src/adapters/line.rs +++ b/crates/openab-gateway/src/adapters/line.rs @@ -1161,6 +1161,8 @@ mod tests { vtuber: None, #[cfg(feature = "vtuber")] vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: None, ws_token: None, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), diff --git a/crates/openab-gateway/src/adapters/teams.rs b/crates/openab-gateway/src/adapters/teams.rs index 6a1d232c7..db27c855c 100644 --- a/crates/openab-gateway/src/adapters/teams.rs +++ b/crates/openab-gateway/src/adapters/teams.rs @@ -666,6 +666,8 @@ mod tests { vtuber: None, #[cfg(feature = "vtuber")] vtuber_pending: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: None, ws_token: None, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index 9de819b14..0bdf50601 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -1,29 +1,24 @@ -//! VTuber adapter — an OpenAI-compatible `/v1/chat/completions` (SSE) front door -//! backed by an OAB ACP agent. +//! VTuber platform adapter. //! -//! Any "skin" that already speaks OpenAI chat completions (AniCompanion, -//! Open-LLM-VTuber, …) points at this endpoint and gets a real agent with zero -//! client changes. Inline `[emotion]` tags emitted by the agent pass through the -//! stream untouched; the skin parses + maps them to its own motion system. -//! -//! The gateway has no embedded agent — it relays events to an OAB process over -//! `/ws` and receives replies asynchronously on a different task. So an in-flight -//! HTTP request parks an `mpsc` sender in [`ReplyRegistry`] keyed by a per-request -//! `channel.id`; [`handle_reply`] (called from the `/ws` recv loop) feeds reply -//! snapshots back into it, and the SSE stream re-emits them as OpenAI deltas. +//! Tier-1: OpenAI-compatible POST /v1/chat/completions (SSE) — streams agent +//! replies as `chat.completion.chunk` deltas. +//! Tier-2: WebSocket /v1/vtuber/ws — pushes agent_state, tool_status, emotion, +//! and notification events derived from GatewayReply commands. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::Infallible; use std::sync::Arc; use std::time::Duration; -use axum::extract::State; +use axum::extract::ws::{Message, WebSocket}; +use axum::extract::{Query, State, WebSocketUpgrade}; use axum::http::{header::AUTHORIZATION, HeaderMap, StatusCode}; use axum::response::sse::{Event, KeepAlive, Sse}; use axum::response::{IntoResponse, Response}; use axum::Json; use futures_util::stream::Stream; -use serde::Deserialize; +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::sync::{mpsc, Mutex}; use tracing::{info, warn}; @@ -364,6 +359,300 @@ pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { } } +// --------------------------------------------------------------------------- +// Tier-2 WS event types +// --------------------------------------------------------------------------- + +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type")] +pub enum WsEvent { + #[serde(rename = "agent_state")] + AgentState { + ts: i64, + state: &'static str, + session_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + detail: Option, + }, + #[serde(rename = "emotion")] + Emotion { + ts: i64, + session_id: String, + tag: String, + intensity: f32, + }, + #[serde(rename = "notification")] + Notification { + ts: i64, + text: String, + urgency: &'static str, + }, + #[serde(rename = "pong")] + Pong { ts: i64 }, +} + +#[derive(Clone, Debug, Serialize)] +pub struct AgentStateDetail { + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_class: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + pub subagent_count: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type")] +enum WsCommand { + #[serde(rename = "subscribe")] + Subscribe { events: Vec }, + #[serde(rename = "ping")] + Ping, +} + +// --------------------------------------------------------------------------- +// Connected WS clients registry +// --------------------------------------------------------------------------- + +pub struct WsClient { + pub tx: mpsc::UnboundedSender, + pub subscribed: Option>, +} + +pub type WsClients = Arc>>; + +pub fn new_ws_clients() -> WsClients { + Arc::new(Mutex::new(Vec::new())) +} + +// --------------------------------------------------------------------------- +// Reaction emoji → agent_state mapping +// --------------------------------------------------------------------------- + +struct ReactionMapping { + state: &'static str, + tool_class: Option<&'static str>, +} + +fn reaction_to_state(emoji: &str) -> Option { + let first = emoji.chars().next()?; + Some(match first { + '👀' => ReactionMapping { state: "thinking", tool_class: None }, + '🤔' => ReactionMapping { state: "thinking", tool_class: None }, + '🔥' => ReactionMapping { state: "working", tool_class: Some("tool") }, + '👨' => ReactionMapping { state: "working", tool_class: Some("coding") }, + '⚡' => ReactionMapping { state: "working", tool_class: Some("web") }, + '🆗' => ReactionMapping { state: "attention", tool_class: None }, + '😱' => ReactionMapping { state: "error", tool_class: None }, + '🥱' => ReactionMapping { state: "error", tool_class: None }, + '😨' => ReactionMapping { state: "error", tool_class: None }, + _ => return None, + }) +} + +fn extract_emotion_tags(text: &str) -> Vec { + let mut tags = Vec::new(); + let mut rest = text; + while let Some(start) = rest.find('[') { + if let Some(end) = rest[start..].find(']') { + let tag = &rest[start + 1..start + end]; + if !tag.is_empty() && tag.len() < 30 && tag.chars().all(|c| c.is_alphanumeric() || c == '_') { + tags.push(tag.to_string()); + } + rest = &rest[start + end + 1..]; + } else { + break; + } + } + tags +} + +// --------------------------------------------------------------------------- +// Derive Tier-2 WS events from a GatewayReply +// --------------------------------------------------------------------------- + +pub fn derive_events(reply: &GatewayReply) -> Vec { + let ts = chrono::Utc::now().timestamp(); + let session_id = reply.channel.id.clone(); + let mut events = Vec::new(); + + match reply.command.as_deref() { + Some("add_reaction") => { + if let Some(mapping) = reaction_to_state(&reply.content.text) { + events.push(WsEvent::AgentState { + ts, + state: mapping.state, + session_id, + detail: mapping.tool_class.map(|tc| AgentStateDetail { + tool_class: Some(tc), + subagent_count: None, + }), + }); + } + } + Some("remove_reaction") => {} + Some("edit_message") => { + for tag in extract_emotion_tags(&reply.content.text) { + events.push(WsEvent::Emotion { + ts, + session_id: session_id.clone(), + tag, + intensity: 1.0, + }); + } + } + None | Some("send_message") => { + for tag in extract_emotion_tags(&reply.content.text) { + events.push(WsEvent::Emotion { + ts, + session_id: session_id.clone(), + tag, + intensity: 1.0, + }); + } + events.push(WsEvent::AgentState { + ts, + state: "idle", + session_id, + detail: None, + }); + } + _ => {} + } + + events +} + +// --------------------------------------------------------------------------- +// Broadcast events to connected WS clients +// --------------------------------------------------------------------------- + +pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { + if events.is_empty() { + return; + } + let mut dead = Vec::new(); + let mut guard = clients.lock().await; + for (i, client) in guard.iter().enumerate() { + for event in events { + let event_type = match event { + WsEvent::AgentState { .. } => "agent_state", + WsEvent::Emotion { .. } => "emotion", + WsEvent::Notification { .. } => "notification", + WsEvent::Pong { .. } => continue, + }; + if let Some(ref subs) = client.subscribed { + if !subs.contains(event_type) { + continue; + } + } + if let Ok(json) = serde_json::to_string(event) { + if client.tx.send(json).is_err() { + dead.push(i); + break; + } + } + } + } + for i in dead.into_iter().rev() { + guard.swap_remove(i); + } +} + +// --------------------------------------------------------------------------- +// WS upgrade handler: GET /v1/vtuber/ws +// --------------------------------------------------------------------------- + +pub async fn ws_upgrade( + State(state): State>, + query: Query>, + headers: HeaderMap, + ws: WebSocketUpgrade, +) -> Response { + let token = headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .or_else(|| query.get("token").map(|s| s.as_str())); + + let expected = state.vtuber.as_ref().and_then(|c| c.auth_key.as_ref()); + if let Some(expected) = expected { + if token != Some(expected.as_str()) { + warn!("vtuber WS rejected: invalid or missing token"); + return StatusCode::UNAUTHORIZED.into_response(); + } + } + + ws.on_upgrade(move |socket| handle_ws(state, socket)) +} + +async fn handle_ws(state: Arc, socket: WebSocket) { + let (mut ws_tx, mut ws_rx) = socket.split(); + let (tx, mut rx) = mpsc::unbounded_channel::(); + + info!("vtuber WS client connected"); + + let clients = match &state.vtuber_ws_clients { + Some(c) => c.clone(), + None => return, + }; + let client_idx = { + let mut guard = clients.lock().await; + guard.push(WsClient { tx, subscribed: None }); + guard.len() - 1 + }; + + let send_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if ws_tx.send(Message::Text(msg.into())).await.is_err() { + break; + } + } + }); + + let clients_for_recv = clients.clone(); + let recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = ws_rx.next().await { + if let Message::Text(text) = msg { + match serde_json::from_str::(&text) { + Ok(WsCommand::Subscribe { events }) => { + let mut guard = clients_for_recv.lock().await; + let idx = client_idx.min(guard.len().saturating_sub(1)); + if let Some(client) = guard.get_mut(idx) { + client.subscribed = Some(events.into_iter().collect()); + info!(events = ?client.subscribed, "vtuber WS subscribe updated"); + } + } + Ok(WsCommand::Ping) => { + let pong = WsEvent::Pong { + ts: chrono::Utc::now().timestamp(), + }; + if let Ok(json) = serde_json::to_string(&pong) { + let guard = clients_for_recv.lock().await; + let idx = client_idx.min(guard.len().saturating_sub(1)); + if let Some(client) = guard.get(idx) { + let _ = client.tx.send(json); + } + } + } + Err(_) => { + warn!(raw = %text, "vtuber WS unknown command"); + } + } + } + } + }); + + tokio::select! { + _ = send_task => {}, + _ = recv_task => {}, + } + + let mut guard = clients.lock().await; + if client_idx < guard.len() { + guard.swap_remove(client_idx); + } + info!("vtuber WS client disconnected"); +} + #[cfg(test)] mod tests { use super::*; @@ -421,4 +710,103 @@ mod tests { let (d2, _) = delta_suffix("小光你好", n1); assert_eq!(d2, "你好"); } + + // --- Tier-2 unit tests --- + + #[test] + fn reaction_mapping_covers_all_oab_emojis() { + assert_eq!(reaction_to_state("👀").unwrap().state, "thinking"); + assert_eq!(reaction_to_state("🤔").unwrap().state, "thinking"); + assert_eq!(reaction_to_state("🔥").unwrap().state, "working"); + assert_eq!(reaction_to_state("👨\u{200d}💻").unwrap().state, "working"); + assert_eq!(reaction_to_state("👨\u{200d}💻").unwrap().tool_class, Some("coding")); + assert_eq!(reaction_to_state("⚡").unwrap().state, "working"); + assert_eq!(reaction_to_state("⚡").unwrap().tool_class, Some("web")); + assert_eq!(reaction_to_state("🆗").unwrap().state, "attention"); + assert_eq!(reaction_to_state("😱").unwrap().state, "error"); + assert_eq!(reaction_to_state("🥱").unwrap().state, "error"); + assert_eq!(reaction_to_state("😨").unwrap().state, "error"); + assert!(reaction_to_state("😊").is_none()); + } + + #[test] + fn extract_emotion_tags_basic() { + assert_eq!(extract_emotion_tags("Hello [happy] world"), vec!["happy"]); + assert_eq!( + extract_emotion_tags("[excited] Hi [joy] there"), + vec!["excited", "joy"] + ); + assert!(extract_emotion_tags("no tags here").is_empty()); + assert!(extract_emotion_tags("[with spaces]").is_empty()); + assert!(extract_emotion_tags("[]").is_empty()); + } + + #[test] + fn derive_events_add_reaction() { + use crate::schema::{Content, ReplyChannel}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, + content: Content { content_type: "text".into(), text: "🤔".into(), attachments: vec![] }, + command: Some("add_reaction".into()), + request_id: None, + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::AgentState { state, session_id, .. } => { + assert_eq!(*state, "thinking"); + assert_eq!(session_id, "ch_1"); + } + _ => panic!("expected AgentState"), + } + } + + #[test] + fn derive_events_edit_message_with_emotion() { + use crate::schema::{Content, ReplyChannel}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, + content: Content { content_type: "text".into(), text: "[excited] Hello!".into(), attachments: vec![] }, + command: Some("edit_message".into()), + request_id: Some("req_1".into()), + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::Emotion { tag, intensity, .. } => { + assert_eq!(tag, "excited"); + assert_eq!(*intensity, 1.0); + } + _ => panic!("expected Emotion"), + } + } + + #[test] + fn derive_events_send_message_idle() { + use crate::schema::{Content, ReplyChannel}; + let reply = GatewayReply { + schema: "openab.gateway.reply.v1".into(), + reply_to: "evt_1".into(), + platform: "vtuber".into(), + channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, + content: Content { content_type: "text".into(), text: "Done!".into(), attachments: vec![] }, + command: None, + request_id: None, + quote_message_id: None, + }; + let events = derive_events(&reply); + assert_eq!(events.len(), 1); + match &events[0] { + WsEvent::AgentState { state, .. } => assert_eq!(*state, "idle"), + _ => panic!("expected AgentState idle"), + } + } } diff --git a/crates/openab-gateway/src/lib.rs b/crates/openab-gateway/src/lib.rs index f11fbb481..79f04d3ca 100644 --- a/crates/openab-gateway/src/lib.rs +++ b/crates/openab-gateway/src/lib.rs @@ -45,6 +45,8 @@ pub struct AppState { /// keyed by the per-request `channel.id`. See `adapters::vtuber`. #[cfg(feature = "vtuber")] pub vtuber_pending: adapters::vtuber::ReplyRegistry, + #[cfg(feature = "vtuber")] + pub vtuber_ws_clients: Option, pub ws_token: Option, pub event_tx: broadcast::Sender, pub reply_token_cache: ReplyTokenCache, @@ -152,6 +154,8 @@ impl AppState { vtuber, #[cfg(feature = "vtuber")] vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: Some(adapters::vtuber::new_ws_clients()), ws_token, event_tx, reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), @@ -371,8 +375,10 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { #[cfg(feature = "vtuber")] if vtuber.is_some() { let path = std::env::var("VTUBER_PATH").unwrap_or_else(|_| "/v1/chat/completions".into()); - info!(path = %path, "vtuber adapter enabled"); - app = app.route(&path, post(adapters::vtuber::chat_completions)); + info!("vtuber adapter enabled (Tier-1 at {path}, Tier-2 WS at /v1/vtuber/ws)"); + app = app + .route(&path, post(adapters::vtuber::chat_completions)) + .route("/v1/vtuber/ws", get(adapters::vtuber::ws_upgrade)); } let client = reqwest::Client::builder() @@ -399,6 +405,8 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { vtuber, #[cfg(feature = "vtuber")] vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + #[cfg(feature = "vtuber")] + vtuber_ws_clients: Some(adapters::vtuber::new_ws_clients()), ws_token, event_tx, reply_token_cache, @@ -613,6 +621,10 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: &state_for_recv.vtuber_pending, ) .await; + if let Some(ref ws_clients) = state_for_recv.vtuber_ws_clients { + let events = adapters::vtuber::derive_events(&reply); + adapters::vtuber::broadcast(ws_clients, &events).await; + } } other => warn!(platform = other, "unknown reply platform"), } From c154dc162a878f5c87a2eba3719cdccf4b5e3c2e Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 09:28:34 +0800 Subject: [PATCH 03/10] fix(gateway): use keyed HashMap for vtuber WS client registry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Vec with HashMap and an AtomicU64 counter. The old Vec+index scheme broke when broadcast() called swap_remove on dead clients — surviving clients' stored indices became stale, routing subscribe/pong to the wrong connection. Co-Authored-By: Claude Opus 4.6 Claude-Session: https://claude.ai/code/session_01NXngQyvmJwQPNYsiRUNU2m --- crates/openab-gateway/src/adapters/vtuber.rs | 37 +++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index 0bdf50601..2d46e6089 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::Infallible; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -417,10 +418,12 @@ pub struct WsClient { pub subscribed: Option>, } -pub type WsClients = Arc>>; +static NEXT_CLIENT_ID: AtomicU64 = AtomicU64::new(0); + +pub type WsClients = Arc>>; pub fn new_ws_clients() -> WsClients { - Arc::new(Mutex::new(Vec::new())) + Arc::new(Mutex::new(HashMap::new())) } // --------------------------------------------------------------------------- @@ -531,7 +534,7 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } let mut dead = Vec::new(); let mut guard = clients.lock().await; - for (i, client) in guard.iter().enumerate() { + for (&id, client) in guard.iter() { for event in events { let event_type = match event { WsEvent::AgentState { .. } => "agent_state", @@ -546,14 +549,14 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } if let Ok(json) = serde_json::to_string(event) { if client.tx.send(json).is_err() { - dead.push(i); + dead.push(id); break; } } } } - for i in dead.into_iter().rev() { - guard.swap_remove(i); + for id in dead { + guard.remove(&id); } } @@ -588,17 +591,14 @@ async fn handle_ws(state: Arc, socket: WebSocket) { let (mut ws_tx, mut ws_rx) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); - info!("vtuber WS client connected"); + let client_id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed); + info!(client_id, "vtuber WS client connected"); let clients = match &state.vtuber_ws_clients { Some(c) => c.clone(), None => return, }; - let client_idx = { - let mut guard = clients.lock().await; - guard.push(WsClient { tx, subscribed: None }); - guard.len() - 1 - }; + clients.lock().await.insert(client_id, WsClient { tx, subscribed: None }); let send_task = tokio::spawn(async move { while let Some(msg) = rx.recv().await { @@ -615,8 +615,7 @@ async fn handle_ws(state: Arc, socket: WebSocket) { match serde_json::from_str::(&text) { Ok(WsCommand::Subscribe { events }) => { let mut guard = clients_for_recv.lock().await; - let idx = client_idx.min(guard.len().saturating_sub(1)); - if let Some(client) = guard.get_mut(idx) { + if let Some(client) = guard.get_mut(&client_id) { client.subscribed = Some(events.into_iter().collect()); info!(events = ?client.subscribed, "vtuber WS subscribe updated"); } @@ -627,8 +626,7 @@ async fn handle_ws(state: Arc, socket: WebSocket) { }; if let Ok(json) = serde_json::to_string(&pong) { let guard = clients_for_recv.lock().await; - let idx = client_idx.min(guard.len().saturating_sub(1)); - if let Some(client) = guard.get(idx) { + if let Some(client) = guard.get(&client_id) { let _ = client.tx.send(json); } } @@ -646,11 +644,8 @@ async fn handle_ws(state: Arc, socket: WebSocket) { _ = recv_task => {}, } - let mut guard = clients.lock().await; - if client_idx < guard.len() { - guard.swap_remove(client_idx); - } - info!("vtuber WS client disconnected"); + clients.lock().await.remove(&client_id); + info!(client_id, "vtuber WS client disconnected"); } #[cfg(test)] From 50c252ecd07d8057a2098d205558f0181db17920 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 10:18:04 +0800 Subject: [PATCH 04/10] fix(gateway): add vtuber rate limit and early idle warning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F2: Cap in-flight /v1/chat/completions at 32 by checking vtuber_pending size before accepting — returns 429 when full. F3: Emit SSE comment `: waiting for agent` after 10s of silence, giving clients an early signal before the 180s hard timeout. Co-Authored-By: Claude Opus 4.6 Claude-Session: https://claude.ai/code/session_01NXngQyvmJwQPNYsiRUNU2m --- crates/openab-gateway/src/adapters/vtuber.rs | 61 ++++++++++++-------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index 2d46e6089..7d99d33c3 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -42,6 +42,8 @@ pub type ReplyRegistry = Arc= VTUBER_MAX_INFLIGHT { + return (StatusCode::TOO_MANY_REQUESTS, "too many in-flight requests").into_response(); + } + let prompt = flatten_messages(&req.messages); if prompt.trim().is_empty() { return ( @@ -244,6 +250,7 @@ struct StreamState { rx: mpsc::UnboundedReceiver, sent_len: usize, phase: u8, // 0=role, 1=stream snapshots, 2=finish, 3=[DONE], 4=end + warned: bool, id: String, created: i64, model: String, @@ -263,6 +270,7 @@ fn reply_stream( rx, sent_len: 0, phase: 0, + warned: false, id: format!("chatcmpl-{}", Uuid::new_v4()), created: chrono::Utc::now().timestamp(), model, @@ -284,32 +292,39 @@ fn reply_stream( ); return Some((Ok(ev), s)); } - 1 => match tokio::time::timeout(REPLY_IDLE_TIMEOUT, s.rx.recv()).await { - Ok(Some(ReplyChunk::Snapshot(full))) => { - let (delta, new_len) = delta_suffix(&full, s.sent_len); - if delta.is_empty() { + 1 => { + let wait = if s.warned { REPLY_IDLE_TIMEOUT } else { REPLY_AGENT_WAIT }; + match tokio::time::timeout(wait, s.rx.recv()).await { + Ok(Some(ReplyChunk::Snapshot(full))) => { + let (delta, new_len) = delta_suffix(&full, s.sent_len); + if delta.is_empty() { + continue; + } + s.sent_len = new_len; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({ "content": delta }), + None, + ); + return Some((Ok(ev), s)); + } + Ok(Some(ReplyChunk::Done)) | Ok(None) => { + s.phase = 2; + continue; + } + Err(_) if !s.warned => { + s.warned = true; + return Some((Ok(Event::default().comment("waiting for agent")), s)); + } + Err(_) => { + warn!(channel = %s.channel_id, "vtuber: reply timed out (no agent connected?)"); + s.phase = 2; continue; } - s.sent_len = new_len; - let ev = chunk_event( - &s.id, - s.created, - &s.model, - json!({ "content": delta }), - None, - ); - return Some((Ok(ev), s)); - } - Ok(Some(ReplyChunk::Done)) | Ok(None) => { - s.phase = 2; - continue; - } - Err(_) => { - warn!(channel = %s.channel_id, "vtuber: reply timed out (no agent connected?)"); - s.phase = 2; - continue; } - }, + } 2 => { s.phase = 3; let ev = chunk_event(&s.id, s.created, &s.model, json!({}), Some("stop")); From c1b94953013159776995674a57b20bdb27ea24a8 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 11:20:02 +0800 Subject: [PATCH 05/10] fix(gateway): satisfy clippy collapsible_match in handle_reply Co-Authored-By: Claude Opus 4.6 Claude-Session: https://claude.ai/code/session_01NXngQyvmJwQPNYsiRUNU2m --- crates/openab-gateway/src/adapters/vtuber.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index 7d99d33c3..d6b598838 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -358,11 +358,8 @@ pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { }; match reply.command.as_deref() { - // Partial: an in-progress edit carrying the full accumulated text. - Some("edit_message") => { - if tx.send(ReplyChunk::Snapshot(full)).is_err() { - map.remove(key); // SSE side hung up - } + Some("edit_message") if tx.send(ReplyChunk::Snapshot(full.clone())).is_err() => { + map.remove(key); } // Final: the turn's last message (command = None). None => { From 8aae3b4d9790f5f152e205460a6297d4e2ded516 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 17:29:55 +0800 Subject: [PATCH 06/10] feat(gateway): sync vtuber adapter updates --- crates/openab-gateway/src/adapters/vtuber.rs | 1457 ++++++++++++------ crates/openab-gateway/src/lib.rs | 7 + 2 files changed, 970 insertions(+), 494 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index d6b598838..f6b0fd409 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -5,373 +5,25 @@ //! Tier-2: WebSocket /v1/vtuber/ws — pushes agent_state, tool_status, emotion, //! and notification events derived from GatewayReply commands. -use std::collections::{HashMap, HashSet}; -use std::convert::Infallible; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::Duration; - +use crate::schema::*; use axum::extract::ws::{Message, WebSocket}; use axum::extract::{Query, State, WebSocketUpgrade}; use axum::http::{header::AUTHORIZATION, HeaderMap, StatusCode}; -use axum::response::sse::{Event, KeepAlive, Sse}; -use axum::response::{IntoResponse, Response}; +use axum::response::sse::{Event as SseEvent, KeepAlive, Sse}; +use axum::response::IntoResponse; use axum::Json; use futures_util::stream::Stream; use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::{HashMap, HashSet}; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::{mpsc, Mutex}; use tracing::{info, warn}; use uuid::Uuid; -use crate::schema::{ChannelInfo, GatewayEvent, GatewayReply, SenderInfo}; - -/// A piece of a streamed reply, handed from the `/ws` recv task to the SSE response. -/// -/// The agent streams *full accumulated snapshots* (not deltas) every ~1.5s, plus a -/// final snapshot; the SSE side diffs them into OpenAI content deltas. -pub enum ReplyChunk { - Snapshot(String), - Done, -} - -/// Per-request `channel.id` → the SSE response awaiting that request's reply. -pub type ReplyRegistry = Arc>>>; - -/// Max wait for the next reply snapshot before closing the stream. Guards the case -/// where no OAB agent is connected to `/ws` (otherwise the request would hang). -const REPLY_IDLE_TIMEOUT: Duration = Duration::from_secs(180); -const REPLY_AGENT_WAIT: Duration = Duration::from_secs(10); -const VTUBER_MAX_INFLIGHT: usize = 32; - -// --- Config --- - -pub struct VtuberConfig { - /// Bearer key required on inbound requests. `None` = unauthenticated (warned). - pub auth_key: Option, - /// Model name echoed back in chunks when the request omits one. - pub default_model: String, -} - -impl VtuberConfig { - pub fn from_env() -> Option { - Self::from_reader(|k| std::env::var(k).ok()) - } - - /// Build from an arbitrary reader so tests avoid `env::set_var` races under - /// cargo's parallel runner (same pattern as the other adapters). - fn from_reader Option>(read: F) -> Option { - let enabled = read("VTUBER_ENABLED") - .map(|v| v == "true" || v == "1") - .unwrap_or(false); - if !enabled { - return None; - } - let auth_key = read("VTUBER_AUTH_KEY"); - if auth_key.is_none() { - warn!("VTUBER_AUTH_KEY not set — /v1/chat/completions is UNAUTHENTICATED (insecure)"); - } - let default_model = read("VTUBER_DEFAULT_MODEL").unwrap_or_else(|| "openab".into()); - info!( - default_model = %default_model, - authenticated = auth_key.is_some(), - "vtuber adapter configured" - ); - Some(Self { - auth_key, - default_model, - }) - } -} - -// --- OpenAI request DTOs (subset we honor) --- - -#[derive(Deserialize)] -pub struct ChatMessage { - #[serde(default)] - pub role: String, - #[serde(default)] - pub content: String, -} - -#[derive(Deserialize)] -pub struct ChatRequest { - #[serde(default)] - pub model: Option, - #[serde(default)] - pub messages: Vec, - #[serde(default)] - pub stream: Option, -} - -/// Flatten OpenAI `messages[]` (incl. the skin's own persona/system prompt) into a -/// single prompt string. Each request mints a fresh session, so the full history -/// carried in `messages` is the agent's only context. -fn flatten_messages(messages: &[ChatMessage]) -> String { - let mut out = String::new(); - for m in messages { - if m.content.trim().is_empty() { - continue; - } - let label = match m.role.as_str() { - "system" => "System", - "assistant" => "Assistant", - "user" | "" => "User", - other => other, - }; - if !out.is_empty() { - out.push_str("\n\n"); - } - out.push_str(label); - out.push_str(": "); - out.push_str(&m.content); - } - out -} - -/// Newly-appended suffix of `full` beyond `sent_len`, and the new sent length. -/// Snapshots grow monotonically; if the prefix ever changed (shouldn't happen) we -/// resend the whole string rather than panic on a non-char-boundary slice. -fn delta_suffix(full: &str, sent_len: usize) -> (String, usize) { - match full.get(sent_len..) { - Some(suffix) => (suffix.to_string(), full.len()), - None => (full.to_string(), full.len()), - } -} - -// --- HTTP handler --- - -pub async fn chat_completions( - State(state): State>, - headers: HeaderMap, - Json(req): Json, -) -> Response { - let Some(cfg) = state.vtuber.as_ref() else { - return ( - StatusCode::SERVICE_UNAVAILABLE, - "vtuber adapter not configured", - ) - .into_response(); - }; - - // Bearer auth - if let Some(expected) = cfg.auth_key.as_ref() { - let provided = headers - .get(AUTHORIZATION) - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.strip_prefix("Bearer ")); - if provided != Some(expected.as_str()) { - return (StatusCode::UNAUTHORIZED, "invalid api key").into_response(); - } - } - - if state.vtuber_pending.lock().await.len() >= VTUBER_MAX_INFLIGHT { - return (StatusCode::TOO_MANY_REQUESTS, "too many in-flight requests").into_response(); - } - - let prompt = flatten_messages(&req.messages); - if prompt.trim().is_empty() { - return ( - StatusCode::BAD_REQUEST, - "messages must contain non-empty content", - ) - .into_response(); - } - let model = req - .model - .clone() - .unwrap_or_else(|| cfg.default_model.clone()); - - // Per-request session id. The reply's `channel.id` echoes this, routing the - // agent's reply chunks back to this exact request. - let channel_id = format!("vtb_{}", Uuid::new_v4()); - let (tx, rx) = mpsc::unbounded_channel::(); - state - .vtuber_pending - .lock() - .await - .insert(channel_id.clone(), tx); - - let event = GatewayEvent::new( - "vtuber", - ChannelInfo { - id: channel_id.clone(), - channel_type: "dm".into(), - thread_id: None, - }, - SenderInfo { - id: "vtuber".into(), - name: "vtuber".into(), - display_name: "VTuber".into(), - is_bot: false, - }, - &prompt, - &format!("vtbmsg_{}", Uuid::new_v4()), - Vec::new(), - ); - match serde_json::to_string(&event) { - Ok(json) => { - let _ = state.event_tx.send(json); - } - Err(e) => { - state.vtuber_pending.lock().await.remove(&channel_id); - warn!("vtuber: failed to serialize event: {e}"); - return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response(); - } - } - info!(channel = %channel_id, "vtuber: chat request dispatched"); - - let stream = reply_stream(rx, model, channel_id, state.vtuber_pending.clone()); - Sse::new(stream) - .keep_alive(KeepAlive::default()) - .into_response() -} - -/// OpenAI `chat.completion.chunk` SSE event. -fn chunk_event( - id: &str, - created: i64, - model: &str, - delta: serde_json::Value, - finish: Option<&str>, -) -> Event { - let payload = json!({ - "id": id, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": [{ "index": 0, "delta": delta, "finish_reason": finish }], - }); - Event::default().data(payload.to_string()) -} - -struct StreamState { - rx: mpsc::UnboundedReceiver, - sent_len: usize, - phase: u8, // 0=role, 1=stream snapshots, 2=finish, 3=[DONE], 4=end - warned: bool, - id: String, - created: i64, - model: String, - channel_id: String, - registry: ReplyRegistry, -} - -/// Turn the reply-chunk receiver into an OpenAI SSE stream: -/// `role` chunk → content deltas → `finish_reason:"stop"` → `data: [DONE]`. -fn reply_stream( - rx: mpsc::UnboundedReceiver, - model: String, - channel_id: String, - registry: ReplyRegistry, -) -> impl Stream> { - let init = StreamState { - rx, - sent_len: 0, - phase: 0, - warned: false, - id: format!("chatcmpl-{}", Uuid::new_v4()), - created: chrono::Utc::now().timestamp(), - model, - channel_id, - registry, - }; - - futures_util::stream::unfold(init, |mut s| async move { - loop { - match s.phase { - 0 => { - s.phase = 1; - let ev = chunk_event( - &s.id, - s.created, - &s.model, - json!({ "role": "assistant" }), - None, - ); - return Some((Ok(ev), s)); - } - 1 => { - let wait = if s.warned { REPLY_IDLE_TIMEOUT } else { REPLY_AGENT_WAIT }; - match tokio::time::timeout(wait, s.rx.recv()).await { - Ok(Some(ReplyChunk::Snapshot(full))) => { - let (delta, new_len) = delta_suffix(&full, s.sent_len); - if delta.is_empty() { - continue; - } - s.sent_len = new_len; - let ev = chunk_event( - &s.id, - s.created, - &s.model, - json!({ "content": delta }), - None, - ); - return Some((Ok(ev), s)); - } - Ok(Some(ReplyChunk::Done)) | Ok(None) => { - s.phase = 2; - continue; - } - Err(_) if !s.warned => { - s.warned = true; - return Some((Ok(Event::default().comment("waiting for agent")), s)); - } - Err(_) => { - warn!(channel = %s.channel_id, "vtuber: reply timed out (no agent connected?)"); - s.phase = 2; - continue; - } - } - } - 2 => { - s.phase = 3; - let ev = chunk_event(&s.id, s.created, &s.model, json!({}), Some("stop")); - return Some((Ok(ev), s)); - } - 3 => { - s.phase = 4; - s.registry.lock().await.remove(&s.channel_id); - return Some((Ok(Event::default().data("[DONE]")), s)); - } - _ => return None, - } - } - }) -} - -// --- Reply ingestion (called from the `/ws` recv loop) --- - -/// Route a `GatewayReply` (platform = "vtuber") back to its waiting SSE response. -pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { - let key = reply.channel.id.as_str(); - let full = reply.content.text.clone(); - // Streaming-placeholder bodies are not real content. - if full == "…" || full == "draft" { - return; - } - - let mut map = registry.lock().await; - let Some(tx) = map.get(key) else { - return; - }; - - match reply.command.as_deref() { - Some("edit_message") if tx.send(ReplyChunk::Snapshot(full.clone())).is_err() => { - map.remove(key); - } - // Final: the turn's last message (command = None). - None => { - let _ = tx.send(ReplyChunk::Snapshot(full)); - let _ = tx.send(ReplyChunk::Done); - map.remove(key); - } - // Other commands (reactions, create_topic, …) are irrelevant to the chat stream. - _ => {} - } -} - // --------------------------------------------------------------------------- // Tier-2 WS event types // --------------------------------------------------------------------------- @@ -400,6 +52,13 @@ pub enum WsEvent { text: String, urgency: &'static str, }, + #[serde(rename = "tool_status")] + ToolStatus { + ts: i64, + session_id: String, + tool_name: String, + status: &'static str, + }, #[serde(rename = "pong")] Pong { ts: i64 }, } @@ -412,6 +71,10 @@ pub struct AgentStateDetail { pub subagent_count: Option, } +// --------------------------------------------------------------------------- +// Client → Server commands +// --------------------------------------------------------------------------- + #[derive(Debug, Deserialize)] #[serde(tag = "type")] enum WsCommand { @@ -430,16 +93,90 @@ pub struct WsClient { pub subscribed: Option>, } -static NEXT_CLIENT_ID: AtomicU64 = AtomicU64::new(0); - -pub type WsClients = Arc>>; +pub type WsClients = Arc>>; pub fn new_ws_clients() -> WsClients { - Arc::new(Mutex::new(HashMap::new())) + Arc::new(Mutex::new(Vec::new())) +} + +// --------------------------------------------------------------------------- +// Ambient notification loop +// --------------------------------------------------------------------------- + +pub struct AmbientConfig { + interval: Duration, + prompt: String, + urgency: &'static str, +} + +impl AmbientConfig { + pub fn from_env() -> Option { + let enabled = std::env::var("VTUBER_AMBIENT_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + if !enabled { + return None; + } + + let interval_secs = std::env::var("VTUBER_AMBIENT_INTERVAL_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v >= 60) + .unwrap_or(1800); + let prompt = std::env::var("VTUBER_AMBIENT_PROMPT").unwrap_or_else(|_| { + "Ambient check-in: say one brief, natural message as 小光. If there is no urgent context, keep it warm and non-intrusive.".into() + }); + let urgency = match std::env::var("VTUBER_AMBIENT_URGENCY") + .unwrap_or_else(|_| "normal".into()) + .to_lowercase() + .as_str() + { + "low" => "low", + "high" => "high", + _ => "normal", + }; + + Some(Self { + interval: Duration::from_secs(interval_secs), + prompt, + urgency, + }) + } +} + +pub fn spawn_ambient_task(clients: WsClients) { + let Some(config) = AmbientConfig::from_env() else { + return; + }; + + info!( + interval_secs = config.interval.as_secs(), + urgency = config.urgency, + "vtuber ambient mode enabled" + ); + + tokio::spawn(async move { + loop { + tokio::time::sleep(config.interval).await; + + let client_count = clients.lock().await.len(); + if client_count == 0 { + continue; + } + + let event = WsEvent::Notification { + ts: chrono::Utc::now().timestamp(), + text: config.prompt.clone(), + urgency: config.urgency, + }; + broadcast(&clients, &[event]).await; + info!(client_count, "vtuber ambient notification broadcast"); + } + }); } // --------------------------------------------------------------------------- -// Reaction emoji → agent_state mapping +// Reaction emoji → agent_state mapping (OAB core reactions.rs) // --------------------------------------------------------------------------- struct ReactionMapping { @@ -448,28 +185,63 @@ struct ReactionMapping { } fn reaction_to_state(emoji: &str) -> Option { + // ponytail: match on first char for multi-codepoint emojis (👨‍💻 = U+1F468 ZWJ U+1F4BB) let first = emoji.chars().next()?; Some(match first { - '👀' => ReactionMapping { state: "thinking", tool_class: None }, - '🤔' => ReactionMapping { state: "thinking", tool_class: None }, - '🔥' => ReactionMapping { state: "working", tool_class: Some("tool") }, - '👨' => ReactionMapping { state: "working", tool_class: Some("coding") }, - '⚡' => ReactionMapping { state: "working", tool_class: Some("web") }, - '🆗' => ReactionMapping { state: "attention", tool_class: None }, - '😱' => ReactionMapping { state: "error", tool_class: None }, - '🥱' => ReactionMapping { state: "error", tool_class: None }, - '😨' => ReactionMapping { state: "error", tool_class: None }, + '👀' => ReactionMapping { + state: "thinking", + tool_class: None, + }, + '🤔' => ReactionMapping { + state: "thinking", + tool_class: None, + }, + '🔥' => ReactionMapping { + state: "working", + tool_class: Some("tool"), + }, + '👨' => ReactionMapping { + state: "working", + tool_class: Some("coding"), + }, // 👨‍💻 + '⚡' => ReactionMapping { + state: "working", + tool_class: Some("web"), + }, + '🆗' => ReactionMapping { + state: "attention", + tool_class: None, + }, + '😱' => ReactionMapping { + state: "error", + tool_class: None, + }, + '🥱' => ReactionMapping { + state: "error", + tool_class: None, + }, + '😨' => ReactionMapping { + state: "error", + tool_class: None, + }, _ => return None, }) } +// --------------------------------------------------------------------------- +// Extract [emotion] tags from streamed text +// --------------------------------------------------------------------------- + fn extract_emotion_tags(text: &str) -> Vec { let mut tags = Vec::new(); let mut rest = text; while let Some(start) = rest.find('[') { if let Some(end) = rest[start..].find(']') { let tag = &rest[start + 1..start + end]; - if !tag.is_empty() && tag.len() < 30 && tag.chars().all(|c| c.is_alphanumeric() || c == '_') { + if !tag.is_empty() + && tag.len() < 30 + && tag.chars().all(|c| c.is_alphanumeric() || c == '_') + { tags.push(tag.to_string()); } rest = &rest[start + end + 1..]; @@ -480,6 +252,43 @@ fn extract_emotion_tags(text: &str) -> Vec { tags } +fn parse_pure_tool_status(text: &str) -> Option<(String, &'static str)> { + let normalized = text + .trim() + .trim_matches('`') + .trim() + .trim_start_matches('✅') + .trim_start_matches('✔') + .trim_start_matches('✓') + .trim() + .trim_matches('`') + .trim(); + + if normalized.is_empty() || normalized.len() > 64 { + return None; + } + + let mut chars = normalized.chars(); + let first = chars.next()?; + if !first.is_ascii_alphabetic() { + return None; + } + if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == ' ') { + return None; + } + + let lower = normalized.to_ascii_lowercase(); + if lower == "toolsearch" + || lower == "tool search" + || lower.ends_with("search") + || lower.ends_with("tool") + { + return Some((normalized.to_string(), "done")); + } + + None +} + // --------------------------------------------------------------------------- // Derive Tier-2 WS events from a GatewayReply // --------------------------------------------------------------------------- @@ -503,8 +312,21 @@ pub fn derive_events(reply: &GatewayReply) -> Vec { }); } } - Some("remove_reaction") => {} + Some("remove_reaction") => { + // Reaction cleared — no state change pushed; next add_reaction or + // send_message will update the state. + } Some("edit_message") => { + if let Some((tool_name, status)) = parse_pure_tool_status(&reply.content.text) { + events.push(WsEvent::ToolStatus { + ts, + session_id, + tool_name, + status, + }); + return events; + } + for tag in extract_emotion_tags(&reply.content.text) { events.push(WsEvent::Emotion { ts, @@ -515,6 +337,7 @@ pub fn derive_events(reply: &GatewayReply) -> Vec { } } None | Some("send_message") => { + // Final message — extract any trailing emotions, then go idle. for tag in extract_emotion_tags(&reply.content.text) { events.push(WsEvent::Emotion { ts, @@ -546,12 +369,13 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } let mut dead = Vec::new(); let mut guard = clients.lock().await; - for (&id, client) in guard.iter() { + for (i, client) in guard.iter().enumerate() { for event in events { let event_type = match event { WsEvent::AgentState { .. } => "agent_state", WsEvent::Emotion { .. } => "emotion", WsEvent::Notification { .. } => "notification", + WsEvent::ToolStatus { .. } => "tool_status", WsEvent::Pong { .. } => continue, }; if let Some(ref subs) = client.subscribed { @@ -561,164 +385,745 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } if let Ok(json) = serde_json::to_string(event) { if client.tx.send(json).is_err() { - dead.push(id); + dead.push(i); break; } } } } - for id in dead { - guard.remove(&id); - } + for i in dead.into_iter().rev() { + guard.swap_remove(i); + } +} + +// --------------------------------------------------------------------------- +// WS upgrade handler: GET /v1/vtuber/ws +// --------------------------------------------------------------------------- + +pub async fn ws_upgrade( + State(state): State>, + query: Query>, + headers: axum::http::HeaderMap, + ws: WebSocketUpgrade, +) -> axum::response::Response { + // Auth: Bearer token from Authorization header or ?token= query param + let token = headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .or_else(|| query.get("token").map(|s| s.as_str())); + + let expected = state.vtuber.as_ref().and_then(|c| c.auth_key.as_ref()); + if let Some(expected) = expected { + if token != Some(expected.as_str()) { + warn!("vtuber WS rejected: invalid or missing token"); + return axum::http::StatusCode::UNAUTHORIZED.into_response(); + } + } + + ws.on_upgrade(move |socket| handle_ws(state, socket)) +} + +async fn handle_ws(state: Arc, socket: WebSocket) { + let (mut ws_tx, mut ws_rx) = socket.split(); + let (tx, mut rx) = mpsc::unbounded_channel::(); + + info!("vtuber WS client connected"); + + // Register client + let clients = match &state.vtuber_ws_clients { + Some(c) => c.clone(), + None => return, + }; + let client_idx = { + let mut guard = clients.lock().await; + guard.push(WsClient { + tx, + subscribed: None, + }); + guard.len() - 1 + }; + + // Forward events → client + let send_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if ws_tx.send(Message::Text(msg.into())).await.is_err() { + break; + } + } + }); + + // Receive commands from client + let clients_for_recv = clients.clone(); + let recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = ws_rx.next().await { + if let Message::Text(text) = msg { + match serde_json::from_str::(&text) { + Ok(WsCommand::Subscribe { events }) => { + let mut guard = clients_for_recv.lock().await; + let idx = client_idx.min(guard.len().saturating_sub(1)); + if let Some(client) = guard.get_mut(idx) { + client.subscribed = Some(events.into_iter().collect()); + info!(events = ?client.subscribed, "vtuber WS subscribe updated"); + } + } + Ok(WsCommand::Ping) => { + let pong = WsEvent::Pong { + ts: chrono::Utc::now().timestamp(), + }; + if let Ok(json) = serde_json::to_string(&pong) { + let guard = clients_for_recv.lock().await; + let idx = client_idx.min(guard.len().saturating_sub(1)); + if let Some(client) = guard.get(idx) { + let _ = client.tx.send(json); + } + } + } + Err(_) => { + warn!(raw = %text, "vtuber WS unknown command"); + } + } + } + } + }); + + tokio::select! { + _ = send_task => {}, + _ = recv_task => {}, + } + + // Cleanup + let mut guard = clients.lock().await; + if client_idx < guard.len() { + guard.swap_remove(client_idx); + } + info!("vtuber WS client disconnected"); +} + +// --------------------------------------------------------------------------- +// Tier-1: OpenAI-compatible /v1/chat/completions (SSE) +// --------------------------------------------------------------------------- + +pub enum ReplyChunk { + Snapshot(String), + Done, +} + +pub type ReplyRegistry = Arc>>>; + +const REPLY_FIRST_TIMEOUT: Duration = Duration::from_secs(180); +const DEFAULT_REPLY_TAIL_IDLE: Duration = Duration::from_millis(1500); + +fn reply_tail_idle_timeout() -> Duration { + std::env::var("VTUBER_REPLY_TAIL_IDLE_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|v| *v > 0) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_REPLY_TAIL_IDLE) +} + +pub struct VtuberConfig { + pub auth_key: Option, + pub default_model: String, +} + +impl VtuberConfig { + pub fn from_env() -> Option { + let enabled = std::env::var("VTUBER_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + if !enabled { + return None; + } + let auth_key = std::env::var("VTUBER_AUTH_KEY").ok(); + if auth_key.is_none() { + warn!("VTUBER_AUTH_KEY not set — /v1/chat/completions is UNAUTHENTICATED"); + } + let default_model = + std::env::var("VTUBER_DEFAULT_MODEL").unwrap_or_else(|_| "openab".into()); + Some(Self { + auth_key, + default_model, + }) + } +} + +#[derive(Deserialize)] +pub struct ChatMessage { + #[serde(default)] + pub role: String, + #[serde(default)] + pub content: String, +} + +#[derive(Deserialize)] +pub struct ChatRequest { + #[serde(default)] + pub model: Option, + #[serde(default)] + pub messages: Vec, + #[serde(default)] + pub stream: Option, +} + +fn flatten_messages(messages: &[ChatMessage]) -> String { + let mut out = String::new(); + for m in messages { + if m.content.trim().is_empty() { + continue; + } + let label = match m.role.as_str() { + "system" => "System", + "assistant" => "Assistant", + "user" | "" => "User", + other => other, + }; + if !out.is_empty() { + out.push_str("\n\n"); + } + out.push_str(label); + out.push_str(": "); + out.push_str(&m.content); + } + out } -// --------------------------------------------------------------------------- -// WS upgrade handler: GET /v1/vtuber/ws -// --------------------------------------------------------------------------- +fn delta_suffix(full: &str, sent_len: usize) -> (String, usize) { + match full.get(sent_len..) { + Some(suffix) => (suffix.to_string(), full.len()), + None => (full.to_string(), full.len()), + } +} -pub async fn ws_upgrade( +pub async fn chat_completions( State(state): State>, - query: Query>, headers: HeaderMap, - ws: WebSocketUpgrade, -) -> Response { - let token = headers - .get("authorization") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.strip_prefix("Bearer ")) - .or_else(|| query.get("token").map(|s| s.as_str())); + Json(req): Json, +) -> axum::response::Response { + let Some(ref cfg) = state.vtuber else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + "vtuber adapter not configured", + ) + .into_response(); + }; - let expected = state.vtuber.as_ref().and_then(|c| c.auth_key.as_ref()); - if let Some(expected) = expected { - if token != Some(expected.as_str()) { - warn!("vtuber WS rejected: invalid or missing token"); - return StatusCode::UNAUTHORIZED.into_response(); + if let Some(expected) = cfg.auth_key.as_ref() { + let provided = headers + .get(AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.strip_prefix("Bearer ")); + if provided != Some(expected.as_str()) { + return (StatusCode::UNAUTHORIZED, "invalid api key").into_response(); } } - ws.on_upgrade(move |socket| handle_ws(state, socket)) -} + let prompt = flatten_messages(&req.messages); + if prompt.trim().is_empty() { + return ( + StatusCode::BAD_REQUEST, + "messages must contain non-empty content", + ) + .into_response(); + } + let model = req + .model + .clone() + .unwrap_or_else(|| cfg.default_model.clone()); -async fn handle_ws(state: Arc, socket: WebSocket) { - let (mut ws_tx, mut ws_rx) = socket.split(); - let (tx, mut rx) = mpsc::unbounded_channel::(); + let channel_id = format!("vtb_{}", Uuid::new_v4()); + let (tx, rx) = mpsc::unbounded_channel::(); + state + .vtuber_pending + .lock() + .await + .insert(channel_id.clone(), tx); - let client_id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed); - info!(client_id, "vtuber WS client connected"); + let event = GatewayEvent::new( + "vtuber", + ChannelInfo { + id: channel_id.clone(), + channel_type: "dm".into(), + thread_id: None, + }, + SenderInfo { + id: "vtuber".into(), + name: "vtuber".into(), + display_name: "VTuber".into(), + is_bot: false, + }, + &prompt, + &format!("vtbmsg_{}", Uuid::new_v4()), + Vec::new(), + ); + match serde_json::to_string(&event) { + Ok(json) => { + let _ = state.event_tx.send(json); + } + Err(e) => { + state.vtuber_pending.lock().await.remove(&channel_id); + warn!("vtuber: failed to serialize event: {e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response(); + } + } + info!(channel = %channel_id, "vtuber: chat request dispatched"); - let clients = match &state.vtuber_ws_clients { - Some(c) => c.clone(), - None => return, - }; - clients.lock().await.insert(client_id, WsClient { tx, subscribed: None }); + let stream = reply_stream(rx, model, channel_id, state.vtuber_pending.clone()); + Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response() +} - let send_task = tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if ws_tx.send(Message::Text(msg.into())).await.is_err() { - break; - } - } +fn chunk_event( + id: &str, + created: i64, + model: &str, + delta: serde_json::Value, + finish: Option<&str>, +) -> SseEvent { + let payload = json!({ + "id": id, "object": "chat.completion.chunk", "created": created, "model": model, + "choices": [{ "index": 0, "delta": delta, "finish_reason": finish }], }); + SseEvent::default().data(payload.to_string()) +} - let clients_for_recv = clients.clone(); - let recv_task = tokio::spawn(async move { - while let Some(Ok(msg)) = ws_rx.next().await { - if let Message::Text(text) = msg { - match serde_json::from_str::(&text) { - Ok(WsCommand::Subscribe { events }) => { - let mut guard = clients_for_recv.lock().await; - if let Some(client) = guard.get_mut(&client_id) { - client.subscribed = Some(events.into_iter().collect()); - info!(events = ?client.subscribed, "vtuber WS subscribe updated"); +struct StreamState { + rx: mpsc::UnboundedReceiver, + sent_len: usize, + phase: u8, + id: String, + created: i64, + model: String, + channel_id: String, + registry: ReplyRegistry, + seen_snapshot: bool, +} + +fn reply_stream( + rx: mpsc::UnboundedReceiver, + model: String, + channel_id: String, + registry: ReplyRegistry, +) -> impl Stream> { + let init = StreamState { + rx, + sent_len: 0, + phase: 0, + id: format!("chatcmpl-{}", Uuid::new_v4()), + created: chrono::Utc::now().timestamp(), + model, + channel_id, + registry, + seen_snapshot: false, + }; + futures_util::stream::unfold(init, |mut s| async move { + loop { + match s.phase { + 0 => { + s.phase = 1; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({"role":"assistant"}), + None, + ); + return Some((Ok(ev), s)); + } + 1 => match tokio::time::timeout( + if s.seen_snapshot { + reply_tail_idle_timeout() + } else { + REPLY_FIRST_TIMEOUT + }, + s.rx.recv(), + ) + .await + { + Ok(Some(ReplyChunk::Snapshot(full))) => { + let (delta, new_len) = delta_suffix(&full, s.sent_len); + if delta.is_empty() { + continue; } + s.sent_len = new_len; + s.seen_snapshot = true; + let ev = chunk_event( + &s.id, + s.created, + &s.model, + json!({"content": delta}), + None, + ); + return Some((Ok(ev), s)); } - Ok(WsCommand::Ping) => { - let pong = WsEvent::Pong { - ts: chrono::Utc::now().timestamp(), - }; - if let Ok(json) = serde_json::to_string(&pong) { - let guard = clients_for_recv.lock().await; - if let Some(client) = guard.get(&client_id) { - let _ = client.tx.send(json); - } - } + Ok(Some(ReplyChunk::Done)) | Ok(None) => { + s.phase = 2; + continue; } Err(_) => { - warn!(raw = %text, "vtuber WS unknown command"); + if s.seen_snapshot { + info!(channel = %s.channel_id, "vtuber: reply stream idle, closing"); + } else { + warn!(channel = %s.channel_id, "vtuber: reply timed out"); + } + s.phase = 2; + continue; } + }, + 2 => { + s.phase = 3; + let ev = chunk_event(&s.id, s.created, &s.model, json!({}), Some("stop")); + return Some((Ok(ev), s)); + } + 3 => { + s.phase = 4; + s.registry.lock().await.remove(&s.channel_id); + return Some((Ok(SseEvent::default().data("[DONE]")), s)); } + _ => return None, } } - }); + }) +} - tokio::select! { - _ = send_task => {}, - _ = recv_task => {}, +pub async fn handle_reply(reply: &GatewayReply, registry: &ReplyRegistry) { + let key = reply.channel.id.as_str(); + let full = reply.content.text.clone(); + if full == "…" || full == "draft" { + return; } + let is_pure_tool_status = parse_pure_tool_status(&full).is_some(); + + let mut map = registry.lock().await; + let Some(tx) = map.get(key) else { + return; + }; - clients.lock().await.remove(&client_id); - info!(client_id, "vtuber WS client disconnected"); + match reply.command.as_deref() { + Some("edit_message") => { + if is_pure_tool_status { + return; + } + if tx.send(ReplyChunk::Snapshot(full)).is_err() { + map.remove(key); + } + } + None => { + if !is_pure_tool_status { + let _ = tx.send(ReplyChunk::Snapshot(full)); + } + let _ = tx.send(ReplyChunk::Done); + map.remove(key); + } + _ => {} + } } +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + #[cfg(test)] mod tests { use super::*; - #[test] - fn config_disabled_by_default() { - assert!(VtuberConfig::from_reader(|_| None).is_none()); + // ----------------------------------------------------------------------- + // Helper: spin up gateway, return (addr, oab_ws_url, vtuber_ws_url) + // ----------------------------------------------------------------------- + async fn start_gateway() -> (String, String, String) { + let (event_tx, _) = tokio::sync::broadcast::channel::(256); + let ws_clients = new_ws_clients(); + let state = Arc::new(crate::AppState { + telegram_bot_token: None, + telegram_secret_token: None, + telegram_rich_messages: true, + line_channel_secret: None, + line_access_token: None, + teams: None, + teams_service_urls: Mutex::new(HashMap::new()), + feishu: None, + google_chat: None, + wecom: None, + vtuber: Some(VtuberConfig { + auth_key: Some("test-key".into()), + default_model: "openab".into(), + }), + vtuber_pending: Arc::new(Mutex::new(HashMap::new())), + vtuber_ws_clients: Some(ws_clients), + ws_token: Some("oab-token".into()), + event_tx, + reply_token_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), + line_webhook_semaphore: Arc::new(tokio::sync::Semaphore::new(8)), + client: reqwest::Client::new(), + }); + + let app = axum::Router::new() + .route("/ws", axum::routing::get(crate::ws_handler)) + .route("/v1/vtuber/ws", axum::routing::get(ws_upgrade)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tokio::spawn(async move { axum::serve(listener, app).await.unwrap() }); + + let oab_url = format!("ws://{}/ws?token=oab-token", addr); + let vtb_url = format!("ws://{}/v1/vtuber/ws?token=test-key", addr); + (addr, oab_url, vtb_url) } - #[test] - fn config_enabled_with_defaults() { - let cfg = VtuberConfig::from_reader(|k| match k { - "VTUBER_ENABLED" => Some("true".into()), - "VTUBER_AUTH_KEY" => Some("secret".into()), - _ => None, - }) - .expect("enabled"); - assert_eq!(cfg.auth_key.as_deref(), Some("secret")); - assert_eq!(cfg.default_model, "openab"); + // ----------------------------------------------------------------------- + // Integration: OAB sends add_reaction → vtuber WS receives agent_state + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_reaction_to_agent_state() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + // Connect vtuber WS client + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + + // Connect OAB client and send a vtuber add_reaction reply + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + // Receive event on vtuber WS + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state"); + assert_eq!(event["state"], "thinking"); + assert_eq!(event["session_id"], "ch_test"); } - #[test] - fn flatten_labels_roles_and_skips_empty() { - let msgs = vec![ - ChatMessage { - role: "system".into(), - content: "be 小光".into(), - }, - ChatMessage { - role: "user".into(), - content: " ".into(), - }, - ChatMessage { - role: "user".into(), - content: "hi".into(), - }, + // ----------------------------------------------------------------------- + // Integration: OAB sends edit_message with [emotion] → vtuber WS gets it + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_emotion_from_edit_message() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[happy] Hello world!"}, + "command": "edit_message", + "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "happy"); + assert_eq!(event["intensity"], 1.0); + } + + // ----------------------------------------------------------------------- + // Integration: full agent lifecycle (queued → thinking → working → done → idle) + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_full_lifecycle() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + // Lifecycle: 👀 → 🤔 → 👨‍💻 → 🆗 → send_message + let sequence = [ + ("👀", "add_reaction", "thinking"), + ("🤔", "add_reaction", "thinking"), + ("👨\u{200d}💻", "add_reaction", "working"), + ("🆗", "add_reaction", "attention"), ]; - assert_eq!(flatten_messages(&msgs), "System: be 小光\n\nUser: hi"); + + for (emoji, cmd, expected_state) in &sequence { + let reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_lifecycle"}, + "content": {"type": "text", "text": emoji}, + "command": cmd + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state", "for emoji {emoji}"); + assert_eq!(event["state"], *expected_state, "for emoji {emoji}"); + } + + // Final send_message → idle + let final_reply = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", + "platform": "vtuber", + "channel": {"id": "ch_lifecycle"}, + "content": {"type": "text", "text": "All done!"} + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + final_reply.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "agent_state"); + assert_eq!(event["state"], "idle"); } - #[test] - fn delta_suffix_emits_only_new_text() { - let (d1, n1) = delta_suffix("Hello", 0); - assert_eq!((d1.as_str(), n1), ("Hello", 5)); - let (d2, n2) = delta_suffix("Hello world", n1); - assert_eq!((d2.as_str(), n2), (" world", 11)); - let (d3, _) = delta_suffix("Hello world", n2); - assert_eq!(d3, ""); + // ----------------------------------------------------------------------- + // Integration: subscribe filtering + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_subscribe_filters_events() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + // Subscribe only to emotion events + vtb_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"subscribe","events":["emotion"]}"#.into(), + )) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Send add_reaction (agent_state) — should be filtered out + let reaction = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reaction.to_string(), + )) + .await + .unwrap(); + + // Send edit_message with [emotion] — should pass through + let edit = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[joy] yay"}, + "command": "edit_message", "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + edit.to_string(), + )) + .await + .unwrap(); + + // Should receive emotion, NOT agent_state + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "joy"); } - #[test] - fn delta_suffix_handles_multibyte() { - let (d1, n1) = delta_suffix("小光", 0); - assert_eq!(d1, "小光"); - let (d2, _) = delta_suffix("小光你好", n1); - assert_eq!(d2, "你好"); + // ----------------------------------------------------------------------- + // Integration: ping → pong + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_ping_pong() { + let (_addr, _oab_url, vtb_url) = start_gateway().await; + + let (mut vtb_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + vtb_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"ping"}"#.into(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), vtb_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "pong"); + assert!(event["ts"].is_number()); } - // --- Tier-2 unit tests --- + // ----------------------------------------------------------------------- + // Integration: bad auth → 401 + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_bad_auth_rejected() { + let (addr, _, _) = start_gateway().await; + let bad_url = format!("ws://{}/v1/vtuber/ws?token=wrong", addr); + let result = tokio_tungstenite::connect_async(&bad_url).await; + assert!( + result.is_err() || { + let (_, resp) = result.unwrap(); + resp.status() == reqwest::StatusCode::UNAUTHORIZED + } + ); + } + + // ----------------------------------------------------------------------- + // Unit tests + // ----------------------------------------------------------------------- #[test] fn reaction_mapping_covers_all_oab_emojis() { @@ -726,14 +1131,17 @@ mod tests { assert_eq!(reaction_to_state("🤔").unwrap().state, "thinking"); assert_eq!(reaction_to_state("🔥").unwrap().state, "working"); assert_eq!(reaction_to_state("👨\u{200d}💻").unwrap().state, "working"); - assert_eq!(reaction_to_state("👨\u{200d}💻").unwrap().tool_class, Some("coding")); + assert_eq!( + reaction_to_state("👨\u{200d}💻").unwrap().tool_class, + Some("coding") + ); assert_eq!(reaction_to_state("⚡").unwrap().state, "working"); assert_eq!(reaction_to_state("⚡").unwrap().tool_class, Some("web")); assert_eq!(reaction_to_state("🆗").unwrap().state, "attention"); assert_eq!(reaction_to_state("😱").unwrap().state, "error"); assert_eq!(reaction_to_state("🥱").unwrap().state, "error"); assert_eq!(reaction_to_state("😨").unwrap().state, "error"); - assert!(reaction_to_state("😊").is_none()); + assert!(reaction_to_state("😊").is_none()); // mood face, not a state } #[test] @@ -750,13 +1158,19 @@ mod tests { #[test] fn derive_events_add_reaction() { - use crate::schema::{Content, ReplyChannel}; let reply = GatewayReply { schema: "openab.gateway.reply.v1".into(), reply_to: "evt_1".into(), platform: "vtuber".into(), - channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, - content: Content { content_type: "text".into(), text: "🤔".into(), attachments: vec![] }, + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "🤔".into(), + attachments: vec![], + }, command: Some("add_reaction".into()), request_id: None, quote_message_id: None, @@ -764,7 +1178,9 @@ mod tests { let events = derive_events(&reply); assert_eq!(events.len(), 1); match &events[0] { - WsEvent::AgentState { state, session_id, .. } => { + WsEvent::AgentState { + state, session_id, .. + } => { assert_eq!(*state, "thinking"); assert_eq!(session_id, "ch_1"); } @@ -774,13 +1190,19 @@ mod tests { #[test] fn derive_events_edit_message_with_emotion() { - use crate::schema::{Content, ReplyChannel}; let reply = GatewayReply { schema: "openab.gateway.reply.v1".into(), reply_to: "evt_1".into(), platform: "vtuber".into(), - channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, - content: Content { content_type: "text".into(), text: "[excited] Hello!".into(), attachments: vec![] }, + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "[excited] Hello!".into(), + attachments: vec![], + }, command: Some("edit_message".into()), request_id: Some("req_1".into()), quote_message_id: None, @@ -798,13 +1220,19 @@ mod tests { #[test] fn derive_events_send_message_idle() { - use crate::schema::{Content, ReplyChannel}; let reply = GatewayReply { schema: "openab.gateway.reply.v1".into(), reply_to: "evt_1".into(), platform: "vtuber".into(), - channel: ReplyChannel { id: "ch_1".into(), thread_id: None }, - content: Content { content_type: "text".into(), text: "Done!".into(), attachments: vec![] }, + channel: ReplyChannel { + id: "ch_1".into(), + thread_id: None, + }, + content: Content { + content_type: "text".into(), + text: "Done!".into(), + attachments: vec![], + }, command: None, request_id: None, quote_message_id: None, @@ -816,4 +1244,45 @@ mod tests { _ => panic!("expected AgentState idle"), } } + + #[tokio::test] + async fn reply_stream_finishes_after_snapshot_idle() { + std::env::set_var("VTUBER_REPLY_TAIL_IDLE_MS", "10"); + + let (tx, rx) = mpsc::unbounded_channel::(); + let registry: ReplyRegistry = Arc::new(Mutex::new(HashMap::new())); + registry.lock().await.insert("ch_idle".into(), tx.clone()); + + let mut stream = Box::pin(reply_stream( + rx, + "openab".into(), + "ch_idle".into(), + registry.clone(), + )); + + assert!( + stream.next().await.is_some(), + "role chunk should be emitted first" + ); + tx.send(ReplyChunk::Snapshot("hello".into())).unwrap(); + assert!( + stream.next().await.is_some(), + "content chunk should be emitted" + ); + + let finish = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!(finish.is_ok(), "finish chunk should arrive after tail idle"); + assert!(finish.unwrap().is_some()); + + let done = tokio::time::timeout(Duration::from_secs(1), stream.next()).await; + assert!(done.is_ok(), "[DONE] should arrive after finish chunk"); + assert!(done.unwrap().is_some()); + + assert!( + !registry.lock().await.contains_key("ch_idle"), + "stream completion should remove pending registry entry" + ); + + std::env::remove_var("VTUBER_REPLY_TAIL_IDLE_MS"); + } } diff --git a/crates/openab-gateway/src/lib.rs b/crates/openab-gateway/src/lib.rs index 79f04d3ca..15e027799 100644 --- a/crates/openab-gateway/src/lib.rs +++ b/crates/openab-gateway/src/lib.rs @@ -414,6 +414,13 @@ pub async fn serve(config: ServeConfig) -> anyhow::Result<()> { client, }); + #[cfg(feature = "vtuber")] + if state.vtuber.is_some() { + if let Some(ref clients) = state.vtuber_ws_clients { + adapters::vtuber::spawn_ambient_task(clients.clone()); + } + } + // Background: sweep expired reply tokens { let cache_state = state.clone(); From e16a85b88d6fc4cdca8f8187f3d6bf560c13f26a Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 23:20:09 +0800 Subject: [PATCH 07/10] feat(vtuber): wire adapter into unified binary --- Cargo.toml | 9 +- docs/config-reference.md | 1 + docs/vtuber.md | 172 ++++++++++++++++++++++++--------------- src/main.rs | 111 ++++++++++++++++++++----- src/unified_adapter.rs | 35 ++++++-- 5 files changed, 230 insertions(+), 98 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 042ca6223..23c965fc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,11 @@ async-trait = "0.1" serenity = { version = "0.12", default-features = false, features = ["client", "gateway", "model", "rustls_backend", "cache"], optional = true } [features] -# Default: core only (Discord + Slack). Gateway ships as separate binary. +# Default: core only (Discord + Slack). Platform adapters are opt-in. default = ["discord", "slack", "secrets-aws", "agentcore", "config-s3", "pre-seed"] -# Opt-in: compile all gateway adapters into a single unified binary -unified = ["telegram", "line", "feishu", "googlechat", "wecom", "teams"] +# Opt-in: compile all platform adapters into a single unified binary +unified = ["telegram", "line", "feishu", "googlechat", "wecom", "teams", "vtuber"] # Core adapters discord = ["dep:serenity", "openab-core/discord"] @@ -39,13 +39,14 @@ agentcore = ["openab-core/agentcore"] config-s3 = ["openab-core/config-s3"] pre-seed = ["openab-core/pre-seed"] -# Gateway adapters (each pulls in the gateway crate + axum for embedded server) +# Platform adapters (each pulls in the gateway crate + axum for embedded server) telegram = ["dep:openab-gateway", "dep:axum", "openab-gateway/telegram"] line = ["dep:openab-gateway", "dep:axum", "openab-gateway/line"] feishu = ["dep:openab-gateway", "dep:axum", "openab-gateway/feishu"] googlechat = ["dep:openab-gateway", "dep:axum", "openab-gateway/googlechat"] wecom = ["dep:openab-gateway", "dep:axum", "openab-gateway/wecom"] teams = ["dep:openab-gateway", "dep:axum", "openab-gateway/teams"] +vtuber = ["dep:openab-gateway", "dep:axum", "openab-gateway/vtuber"] [dev-dependencies] tempfile = "3.27.0" diff --git a/docs/config-reference.md b/docs/config-reference.md index 2f5509a14..5eb289e6f 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -692,6 +692,7 @@ Each platform is auto-enabled when its env vars are present: | Google Chat | `GOOGLE_CHAT_ENABLED=true` | `GOOGLE_CHAT_SA_KEY_JSON`, `GOOGLE_CHAT_SA_KEY_FILE`, `GOOGLE_CHAT_ACCESS_TOKEN`, `GOOGLE_CHAT_AUDIENCE`, `GOOGLE_CHAT_WEBHOOK_PATH` | | WeCom | `WECOM_CORP_ID` | _(see wecom config)_ | | Teams | `TEAMS_APP_ID` | `TEAMS_WEBHOOK_PATH` | +| VTuber | `VTUBER_ENABLED=true` | `VTUBER_AUTH_KEY`, `VTUBER_DEFAULT_MODEL`, `VTUBER_PATH`, `VTUBER_REPLY_TAIL_IDLE_MS`, `VTUBER_AMBIENT_*` | > ⚠️ **Production checklist**: Set `GATEWAY_ALLOW_ALL_CHANNELS=false` and `GATEWAY_ALLOW_ALL_USERS=false` with explicit allowlists. The defaults are permissive for development convenience. > diff --git a/docs/vtuber.md b/docs/vtuber.md index 466a9d5a9..d5c9ed743 100644 --- a/docs/vtuber.md +++ b/docs/vtuber.md @@ -1,133 +1,171 @@ # VTuber (OpenAI-compatible) Setup -Expose an **OpenAI-compatible `/v1/chat/completions` (SSE)** endpoint backed by your -OAB agent, so any character "skin" that already speaks OpenAI chat completions -(AniCompanion, Open-LLM-VTuber, …) gets a real agent — tool use, code, MCP, memory — -with **zero client changes**. +Expose an **OpenAI-compatible `/v1/chat/completions` (SSE)** endpoint from the +unified OpenAB binary, so any character "skin" that already speaks OpenAI chat +completions (AniCompanion, Open-LLM-VTuber, ChatVRM, ...) gets a real agent: +tool use, code, MCP, memory, and the same configured ACP backend. ``` -Skin ──POST /v1/chat/completions (SSE)──▶ Gateway (:8080) ◀──WebSocket── OAB Pod - choices[].delta.content (incl. inline [emotion] tags) (OAB connects out) +Skin ──POST /v1/chat/completions (SSE)──▶ OpenAB unified binary ──▶ ACP agent +Skin ◀─GET /v1/vtuber/ws (optional WS)──┘ ``` -Unlike the chat-platform adapters (LINE, Telegram, …) this is **not a webhook**: the -skin holds an HTTP request open and the reply streams back on the same connection. +Unlike chat-platform adapters (LINE, Telegram, ...), this is **not a webhook**: +the skin opens an HTTP request and the reply streams back on that same +connection. The optional Tier-2 WebSocket is only for side-channel UI events +such as agent state, emotions, tool status, and ambient notifications. ## Prerequisites -- A running OAB instance (with kiro-cli or any ACP agent authenticated), **connected - to the gateway** — the gateway has no embedded model; without a connected agent the - endpoint returns nothing. -- The Custom Gateway deployed ([gateway/README.md](../gateway/README.md)). -- A skin that supports an OpenAI-compatible backend (e.g. AniCompanion → Settings → - Agent backend → OpenAI-compatible). +- An OpenAB image/binary built with the `unified` feature. +- An ACP agent configured in the same OpenAB process. +- A public URL, tunnel, or localhost endpoint reachable by the skin. +- A skin that supports an OpenAI-compatible backend, for example + AniCompanion -> Settings -> Agent backend -> OpenAI-compatible. -## 1. Configure the Gateway +## 1. Enable the VTuber Adapter + +Set these environment variables on the OpenAB process: ```bash -# Docker -docker run -d --name openab-gateway \ - -e VTUBER_ENABLED="true" \ - -e VTUBER_AUTH_KEY="$(openssl rand -hex 32)" \ - -e VTUBER_DEFAULT_MODEL="openab" \ - -p 8080:8080 \ - ghcr.io/openabdev/openab-gateway:latest +VTUBER_ENABLED=true +VTUBER_AUTH_KEY="$(openssl rand -hex 32)" +VTUBER_DEFAULT_MODEL=openab +GATEWAY_LISTEN=0.0.0.0:8080 +``` -# Kubernetes -kubectl set env deployment/openab-gateway \ - VTUBER_ENABLED=true \ - VTUBER_AUTH_KEY= \ - VTUBER_DEFAULT_MODEL=openab +Example Docker run with a Kiro-backed OpenAB image: + +```bash +docker run -d --name openab-vtuber \ + -e VTUBER_ENABLED=true \ + -e VTUBER_AUTH_KEY="$VTUBER_AUTH_KEY" \ + -e VTUBER_DEFAULT_MODEL=openab \ + -e KIRO_API_KEY="$KIRO_API_KEY" \ + -p 8080:8080 \ + ghcr.io/openabdev/openab:beta-kiro ``` -## 2. Configure OAB +For Kubernetes or Zeabur, put the same environment variables on the OpenAB +service. No companion container or adapter config block is needed in unified +mode. -Point an OAB gateway connection at the `vtuber` platform. Streaming must use the -**draft** path (no thinking-placeholder) so the gateway can tell a partial edit from -the final message: +## 2. Configure the Agent -```toml -[gateway] -url = "ws://openab-gateway:8080/ws" -platform = "vtuber" -streaming = true -streaming_placeholder = false # required: avoids the "…" placeholder ambiguity +Use the normal OpenAB agent configuration. The VTuber adapter submits incoming +skin messages directly to OpenAB's in-process dispatcher. +```toml [agent] command = "kiro-cli" args = ["acp", "--trust-all-tools"] working_dir = "/home/agent" ``` -`streaming = false` also works — the whole reply arrives as one chunk + `[DONE]`. +Streaming is handled by the VTuber adapter's SSE endpoint; it does not require +separate streaming settings. -## 3. Point the Skin at the Gateway +## 3. Point the Skin at OpenAB In the skin's OpenAI-compatible backend settings: -- **Endpoint / Base URL**: `https://gw.yourdomain.com` (the adapter serves +- **Endpoint / Base URL**: `https://your-openab-host` (the adapter serves `/v1/chat/completions`) - **API Key**: the `VTUBER_AUTH_KEY` value (sent as `Authorization: Bearer `) -- **Model**: anything — the gateway routes to the connected agent regardless and echoes - the name back +- **Model**: anything; OpenAB routes to the configured ACP agent and echoes the + model name back in OpenAI-compatible chunks + +If the skin supports the Tier-2 side channel, connect it to the same base URL +at `/v1/vtuber/ws` with the same bearer key. ## 4. Test ```bash -curl -N https://gw.yourdomain.com/v1/chat/completions \ +curl -N https://your-openab-host/v1/chat/completions \ -H "Authorization: Bearer $VTUBER_AUTH_KEY" \ -H "Content-Type: application/json" \ -d '{"model":"openab","stream":true, "messages":[{"role":"user","content":"hi 小光"}]}' ``` -You should see `data: {...chat.completion.chunk...}` lines ending with `data: [DONE]`. +You should see `data: {...chat.completion.chunk...}` lines ending with +`data: [DONE]`. + +## Tier-2 WebSocket + +`GET /v1/vtuber/ws` is optional. It pushes structured UI events: + +- `agent_state` +- `emotion` +- `tool_status` +- `notification` + +Clients can send `{"type":"subscribe","events":[...]}` to filter event types +and `{"type":"ping"}` for keepalive. + +Ambient notifications are opt-in: + +```bash +VTUBER_AMBIENT_ENABLED=true +VTUBER_AMBIENT_INTERVAL_SECS=1800 +VTUBER_AMBIENT_URGENCY=normal +VTUBER_AMBIENT_PROMPT="Ambient check-in prompt..." +``` -## Emotion tags +## Emotion Tags -Inline `[emotion]` tags (e.g. AniCompanion's 16: `[happy] [sad] [curious] …`) are the -skin's own convention — the skin's persona/system prompt instructs the agent to emit -them, and the skin parses + strips them before TTS. **The adapter passes them through -the stream verbatim** and does no emotion handling itself. +Inline `[emotion]` tags (for example AniCompanion's `[happy]`, `[sad]`, +`[curious]`, ...) are the skin's own convention. The skin's persona/system +prompt instructs the agent to emit them, and the skin parses and strips them +before TTS. Tier-1 passes them through verbatim; Tier-2 also extracts recognized +tags as `emotion` events for clients that subscribe to the side channel. ## Environment Variables | Variable | Required | Description | |---|---|---| -| `VTUBER_ENABLED` | Yes | `true`/`1` to enable the adapter | -| `VTUBER_AUTH_KEY` | Recommended | Bearer key required on requests. If unset, the endpoint is **unauthenticated** (logged as insecure) | +| `VTUBER_ENABLED` | Yes | `true`/`1` to enable the adapter in the unified binary | +| `VTUBER_AUTH_KEY` | Recommended | Bearer key required on Tier-1 and Tier-2 requests. If unset, the endpoint is unauthenticated and logs a warning | | `VTUBER_DEFAULT_MODEL` | No | Model name echoed back when the request omits one (default `openab`) | -| `VTUBER_PATH` | No | Route path (default `/v1/chat/completions`) | +| `VTUBER_PATH` | No | Tier-1 route path (default `/v1/chat/completions`) | +| `VTUBER_REPLY_TAIL_IDLE_MS` | No | Tail-idle close delay after the first content snapshot (default `1500`) | +| `VTUBER_AMBIENT_ENABLED` | No | `true`/`1` to enable Tier-2 ambient notifications | +| `VTUBER_AMBIENT_INTERVAL_SECS` | No | Ambient notification interval, minimum 60 seconds (default `1800`) | +| `VTUBER_AMBIENT_URGENCY` | No | `low`, `normal`, or `high` (default `normal`) | +| `VTUBER_AMBIENT_PROMPT` | No | Prompt text sent to Tier-2 clients as a `notification` event | +| `GATEWAY_LISTEN` | No | Bind address for the unified HTTP listener (default `0.0.0.0:8080`) | ## Notes & Limitations - **One session per request.** Each call mints a fresh agent session; the full - conversation history must be carried in `messages[]` (which OpenAI clients already do). -- **No agent connected ⇒ no output.** The request closes after a 180s idle timeout if no - reply arrives. Check that an OAB agent is connected to `/ws`. -- **Pull-only.** OpenAI chat completions cannot push agent-state animation cues or - proactive/ambient messages. Those are a planned Tier-2 WebSocket side-channel - (see the RFC), not part of this adapter. -- **Tags are not motion.** Mapping `[emotion]` → VRM expression / Live2D / VTube Studio - is the skin's job; the adapter stays motion-system-agnostic. + conversation history must be carried in `messages[]`, which OpenAI clients + already do. +- **No agent output => no chat output.** If the configured ACP agent cannot + answer, the SSE request eventually closes after the reply timeout. +- **Tier-2 is optional.** Skins that only support OpenAI chat completions still + get full Tier-1 chat. Tier-2 adds richer UI state when the skin supports it. +- **Tags are not motion.** Mapping `[emotion]` or Tier-2 `emotion` events to VRM + expressions, Live2D parameters, or VTube Studio actions is the skin's job. ## Troubleshooting **No response / stream hangs then closes:** -- Confirm an OAB agent is connected: check gateway logs for the `/ws` connection. -- Confirm `[gateway] platform = "vtuber"` in OAB config. +- Confirm `VTUBER_ENABLED=true` is set on the OpenAB process. +- Confirm the ACP agent is configured and authenticated. +- Check OpenAB logs for `unified: vtuber adapter enabled`. **`401 invalid api key`:** - The `Authorization: Bearer ` value must match `VTUBER_AUTH_KEY`. **Reply arrives all at once instead of streaming:** -- Set `streaming = true` and `streaming_placeholder = false` in OAB's `[gateway]` block. +- Confirm the skin is calling `/v1/chat/completions` with `stream: true`. +- Confirm no proxy in front of OpenAB buffers SSE responses. -**Duplicated/garbled streaming text:** -- Ensure only one OAB agent is connected for the `vtuber` platform; multiple agents - reply on the same request id. +**Tier-2 events do not arrive:** +- Confirm the client connects to `/v1/vtuber/ws` on the same OpenAB host. +- Confirm it uses the same bearer key as Tier-1. +- Check OpenAB logs for `vtuber WS client connected`. ## References -- [ADR: Custom Gateway](adr/custom-gateway.md) - [RFC: VTuber adapter](https://github.com/openabdev/openab/issues/1233) diff --git a/src/main.rs b/src/main.rs index c3ad9f8d1..d2c3a3f84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,7 +123,14 @@ fn has_unified_platform_env() -> bool { || (cfg!(feature = "feishu") && std::env::var("FEISHU_APP_ID").is_ok()) || (cfg!(feature = "wecom") && std::env::var("WECOM_CORP_ID").is_ok()) || (cfg!(feature = "teams") && std::env::var("TEAMS_APP_ID").is_ok()) - || (cfg!(feature = "googlechat") && std::env::var("GOOGLE_CHAT_ENABLED").map(|v| v == "true" || v == "1").unwrap_or(false)) + || (cfg!(feature = "googlechat") + && std::env::var("GOOGLE_CHAT_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false)) + || (cfg!(feature = "vtuber") + && std::env::var("VTUBER_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false)) } #[tokio::main] @@ -145,7 +152,11 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } #[cfg(feature = "agentcore")] - Commands::AgentcoreBridge { runtime_arn, region, command } => { + Commands::AgentcoreBridge { + runtime_arn, + region, + command, + } => { return acp::agentcore::run_bridge(&runtime_arn, ®ion, &command).await; } Commands::Set { key, value, thread } => { @@ -199,11 +210,13 @@ async fn main() -> anyhow::Result<()> { "config loaded" ); - if cfg.discord.is_none() && cfg.slack.is_none() && cfg.gateway.is_none() + if cfg.discord.is_none() + && cfg.slack.is_none() + && cfg.gateway.is_none() && !has_unified_platform_env() { anyhow::bail!( - "no adapter configured — add [discord], [slack], or [gateway] to config, or set platform env vars (TELEGRAM_BOT_TOKEN, etc.)" + "no adapter configured — add [discord], [slack], a legacy [gateway] config, or set unified platform env vars (TELEGRAM_BOT_TOKEN, VTUBER_ENABLED, etc.)" ); } @@ -492,11 +505,11 @@ async fn main() -> anyhow::Result<()> { feature = "teams", ))] let _unified_handle = { - use openab_core::gateway::{GatewayEventContext, process_gateway_event}; + use openab_core::gateway::{process_gateway_event, GatewayEventContext}; if has_unified_platform_env() { - let listen_addr = std::env::var("GATEWAY_LISTEN") - .unwrap_or_else(|_| "0.0.0.0:8080".into()); + let listen_addr = + std::env::var("GATEWAY_LISTEN").unwrap_or_else(|_| "0.0.0.0:8080".into()); // Create a dedicated dispatcher for unified gateway events let unified_dispatcher = Arc::new(dispatch::Dispatcher::with_idle_timeout( @@ -517,21 +530,27 @@ async fn main() -> anyhow::Result<()> { let gw_state = Arc::new(openab_gateway::AppState::from_env(event_tx.clone(), None)); // Build axum router with platform webhook routes - let mut app = axum::Router::new() - .route("/health", axum::routing::get(|| async { "ok" })); + let mut app = + axum::Router::new().route("/health", axum::routing::get(|| async { "ok" })); #[cfg(feature = "telegram")] if gw_state.telegram_bot_token.is_some() { let path = std::env::var("TELEGRAM_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/telegram".into()); info!(path = %path, "unified: telegram adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::telegram::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::telegram::webhook), + ); } #[cfg(feature = "line")] { info!("unified: line adapter enabled"); - app = app.route("/webhook/line", axum::routing::post(openab_gateway::adapters::line::webhook)); + app = app.route( + "/webhook/line", + axum::routing::post(openab_gateway::adapters::line::webhook), + ); } #[cfg(feature = "feishu")] @@ -539,23 +558,35 @@ async fn main() -> anyhow::Result<()> { let path = std::env::var("FEISHU_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/feishu".into()); info!(path = %path, "unified: feishu adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::feishu::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::feishu::webhook), + ); } #[cfg(feature = "wecom")] if let Some(ref w) = gw_state.wecom { info!(path = %w.config.webhook_path, "unified: wecom adapter enabled"); app = app - .route(&w.config.webhook_path, axum::routing::get(openab_gateway::adapters::wecom::verify)) - .route(&w.config.webhook_path, axum::routing::post(openab_gateway::adapters::wecom::webhook)); + .route( + &w.config.webhook_path, + axum::routing::get(openab_gateway::adapters::wecom::verify), + ) + .route( + &w.config.webhook_path, + axum::routing::post(openab_gateway::adapters::wecom::webhook), + ); } #[cfg(feature = "teams")] if gw_state.teams.is_some() { - let path = std::env::var("TEAMS_WEBHOOK_PATH") - .unwrap_or_else(|_| "/webhook/teams".into()); + let path = + std::env::var("TEAMS_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/teams".into()); info!(path = %path, "unified: teams adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::teams::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::teams::webhook), + ); } #[cfg(feature = "googlechat")] @@ -563,17 +594,39 @@ async fn main() -> anyhow::Result<()> { let path = std::env::var("GOOGLE_CHAT_WEBHOOK_PATH") .unwrap_or_else(|_| "/webhook/googlechat".into()); info!(path = %path, "unified: googlechat adapter enabled"); - app = app.route(&path, axum::routing::post(openab_gateway::adapters::googlechat::webhook)); + app = app.route( + &path, + axum::routing::post(openab_gateway::adapters::googlechat::webhook), + ); + } + + #[cfg(feature = "vtuber")] + if gw_state.vtuber.is_some() { + let path = + std::env::var("VTUBER_PATH").unwrap_or_else(|_| "/v1/chat/completions".into()); + info!(path = %path, "unified: vtuber adapter enabled"); + app = app + .route( + &path, + axum::routing::post(openab_gateway::adapters::vtuber::chat_completions), + ) + .route( + "/v1/vtuber/ws", + axum::routing::get(openab_gateway::adapters::vtuber::ws_upgrade), + ); + if let Some(ref clients) = gw_state.vtuber_ws_clients { + openab_gateway::adapters::vtuber::spawn_ambient_task(clients.clone()); + } } let app = app.with_state(gw_state.clone()); // Bridge task: receive events from adapters via event_tx, dispatch to core let unified_adapter: Arc = Arc::new( - unified_adapter::UnifiedGatewayAdapter::new(gw_state.clone()) + unified_adapter::UnifiedGatewayAdapter::new(gw_state.clone()), ); - // Read security gating from env (mirrors [gateway] config section) + // Read security gating from env for unified platform adapters. let gw_allow_all_channels = std::env::var("GATEWAY_ALLOW_ALL_CHANNELS") .map(|v| v != "0" && !v.eq_ignore_ascii_case("false")) .unwrap_or(true); @@ -771,14 +824,17 @@ async fn main() -> anyhow::Result<()> { let reminder_store = remind::ReminderStore::load(reminder_path); // Construct ambient dispatcher if enabled and channels configured. - let ambient_dispatcher = if cfg.ambient.enabled && !cfg.ambient.discord.channels.is_empty() { + let ambient_dispatcher = if cfg.ambient.enabled && !cfg.ambient.discord.channels.is_empty() + { info!( channels = ?cfg.ambient.discord.channels, flush_interval = cfg.ambient.flush_interval_seconds, flush_max_messages = cfg.ambient.flush_max_messages, "ambient mode enabled" ); - Some(Arc::new(openab_core::ambient::AmbientDispatcher::new(cfg.ambient.clone()))) + Some(Arc::new(openab_core::ambient::AmbientDispatcher::new( + cfg.ambient.clone(), + ))) } else { None }; @@ -978,6 +1034,7 @@ mod tests { std::env::remove_var("WECOM_CORP_ID"); std::env::remove_var("TEAMS_APP_ID"); std::env::remove_var("GOOGLE_CHAT_ENABLED"); + std::env::remove_var("VTUBER_ENABLED"); } // Case 1: no env vars → false @@ -999,6 +1056,16 @@ mod tests { std::env::set_var("TELEGRAM_BOT_TOKEN", "test-token"); assert_eq!(has_unified_platform_env(), cfg!(feature = "telegram")); + // Case 5: VTUBER_ENABLED=true → true only if feature compiled + clear_all(); + std::env::set_var("VTUBER_ENABLED", "true"); + assert_eq!(has_unified_platform_env(), cfg!(feature = "vtuber")); + + // Case 6: VTUBER_ENABLED=yes (invalid) → false + clear_all(); + std::env::set_var("VTUBER_ENABLED", "yes"); + assert!(!has_unified_platform_env()); + // Cleanup clear_all(); } diff --git a/src/unified_adapter.rs b/src/unified_adapter.rs index e51f43b25..0aef6ef8e 100644 --- a/src/unified_adapter.rs +++ b/src/unified_adapter.rs @@ -89,8 +89,23 @@ impl UnifiedGatewayAdapter { .await; } } + #[cfg(feature = "vtuber")] + "vtuber" => { + openab_gateway::adapters::vtuber::handle_reply( + reply, + &self.gw_state.vtuber_pending, + ) + .await; + if let Some(ref clients) = self.gw_state.vtuber_ws_clients { + let events = openab_gateway::adapters::vtuber::derive_events(reply); + openab_gateway::adapters::vtuber::broadcast(clients, &events).await; + } + } other => { - tracing::warn!(platform = other, "unified adapter: unknown platform, cannot route reply"); + tracing::warn!( + platform = other, + "unified adapter: unknown platform, cannot route reply" + ); } } } @@ -138,8 +153,13 @@ impl ChatAdapter for UnifiedGatewayAdapter { self.dispatch_reply(&reply).await; Ok(MessageRef { channel: channel.clone(), - message_id: format!("unified_{:x}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos()), + message_id: format!( + "unified_{:x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + ), }) } @@ -189,8 +209,13 @@ impl ChatAdapter for UnifiedGatewayAdapter { self.dispatch_reply(&reply).await; Ok(MessageRef { channel: channel.clone(), - message_id: format!("unified_{:x}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos()), + message_id: format!( + "unified_{:x}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + ), }) } From 507b5d636d56967872c09196ee4a89d5817cf956 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Mon, 29 Jun 2026 23:29:30 +0800 Subject: [PATCH 08/10] docs(vtuber): clarify unified setup --- docs/vtuber.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/vtuber.md b/docs/vtuber.md index d5c9ed743..e8f75cc23 100644 --- a/docs/vtuber.md +++ b/docs/vtuber.md @@ -5,10 +5,12 @@ unified OpenAB binary, so any character "skin" that already speaks OpenAI chat completions (AniCompanion, Open-LLM-VTuber, ChatVRM, ...) gets a real agent: tool use, code, MCP, memory, and the same configured ACP backend. -``` -Skin ──POST /v1/chat/completions (SSE)──▶ OpenAB unified binary ──▶ ACP agent -Skin ◀─GET /v1/vtuber/ws (optional WS)──┘ -``` +The skin connects directly to the unified OpenAB HTTP listener in the same +OpenAB process: + +- Tier-1 chat: `POST /v1/chat/completions` streams SSE responses. +- Tier-2 UI events: `GET /v1/vtuber/ws` is optional and uses the same process. +- Agent work: both routes dispatch to the configured ACP agent in OpenAB. Unlike chat-platform adapters (LINE, Telegram, ...), this is **not a webhook**: the skin opens an HTTP request and the reply streams back on that same @@ -59,7 +61,6 @@ skin messages directly to OpenAB's in-process dispatcher. [agent] command = "kiro-cli" args = ["acp", "--trust-all-tools"] -working_dir = "/home/agent" ``` Streaming is handled by the VTuber adapter's SSE endpoint; it does not require From a8c329aae1e77090e9a20af39435d88ae42a07e6 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Tue, 30 Jun 2026 11:31:35 +0800 Subject: [PATCH 09/10] fix(gateway): stabilize vtuber websocket clients --- crates/openab-gateway/src/adapters/vtuber.rs | 112 +++++++++++++++---- 1 file changed, 90 insertions(+), 22 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index f6b0fd409..d88c12f1c 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::{HashMap, HashSet}; use std::convert::Infallible; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, Mutex}; @@ -93,10 +94,12 @@ pub struct WsClient { pub subscribed: Option>, } -pub type WsClients = Arc>>; +static NEXT_CLIENT_ID: AtomicU64 = AtomicU64::new(0); + +pub type WsClients = Arc>>; pub fn new_ws_clients() -> WsClients { - Arc::new(Mutex::new(Vec::new())) + Arc::new(Mutex::new(HashMap::new())) } // --------------------------------------------------------------------------- @@ -369,7 +372,7 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } let mut dead = Vec::new(); let mut guard = clients.lock().await; - for (i, client) in guard.iter().enumerate() { + for (&client_id, client) in guard.iter() { for event in events { let event_type = match event { WsEvent::AgentState { .. } => "agent_state", @@ -385,14 +388,14 @@ pub async fn broadcast(clients: &WsClients, events: &[WsEvent]) { } if let Ok(json) = serde_json::to_string(event) { if client.tx.send(json).is_err() { - dead.push(i); + dead.push(client_id); break; } } } } - for i in dead.into_iter().rev() { - guard.swap_remove(i); + for client_id in dead { + guard.remove(&client_id); } } @@ -427,22 +430,22 @@ pub async fn ws_upgrade( async fn handle_ws(state: Arc, socket: WebSocket) { let (mut ws_tx, mut ws_rx) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); + let client_id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed); - info!("vtuber WS client connected"); + info!(client_id, "vtuber WS client connected"); // Register client let clients = match &state.vtuber_ws_clients { Some(c) => c.clone(), None => return, }; - let client_idx = { - let mut guard = clients.lock().await; - guard.push(WsClient { + clients.lock().await.insert( + client_id, + WsClient { tx, subscribed: None, - }); - guard.len() - 1 - }; + }, + ); // Forward events → client let send_task = tokio::spawn(async move { @@ -461,8 +464,7 @@ async fn handle_ws(state: Arc, socket: WebSocket) { match serde_json::from_str::(&text) { Ok(WsCommand::Subscribe { events }) => { let mut guard = clients_for_recv.lock().await; - let idx = client_idx.min(guard.len().saturating_sub(1)); - if let Some(client) = guard.get_mut(idx) { + if let Some(client) = guard.get_mut(&client_id) { client.subscribed = Some(events.into_iter().collect()); info!(events = ?client.subscribed, "vtuber WS subscribe updated"); } @@ -473,8 +475,7 @@ async fn handle_ws(state: Arc, socket: WebSocket) { }; if let Ok(json) = serde_json::to_string(&pong) { let guard = clients_for_recv.lock().await; - let idx = client_idx.min(guard.len().saturating_sub(1)); - if let Some(client) = guard.get(idx) { + if let Some(client) = guard.get(&client_id) { let _ = client.tx.send(json); } } @@ -493,11 +494,8 @@ async fn handle_ws(state: Arc, socket: WebSocket) { } // Cleanup - let mut guard = clients.lock().await; - if client_idx < guard.len() { - guard.swap_remove(client_idx); - } - info!("vtuber WS client disconnected"); + clients.lock().await.remove(&client_id); + info!(client_id, "vtuber WS client disconnected"); } // --------------------------------------------------------------------------- @@ -846,6 +844,7 @@ mod tests { feishu: None, google_chat: None, wecom: None, + telegram_trusted_source_only: false, vtuber: Some(VtuberConfig { auth_key: Some("test-key".into()), default_model: "openab".into(), @@ -1080,6 +1079,75 @@ mod tests { assert_eq!(event["tag"], "joy"); } + // ----------------------------------------------------------------------- + // Regression: subscription updates stay attached to the same client after + // another client disconnects. + // ----------------------------------------------------------------------- + #[tokio::test] + async fn ws_e2e_subscribe_after_disconnect_uses_same_client() { + let (_addr, oab_url, vtb_url) = start_gateway().await; + + let (mut first_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (_middle_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut target_ws, _) = tokio_tungstenite::connect_async(&vtb_url).await.unwrap(); + let (mut oab_ws, _) = tokio_tungstenite::connect_async(&oab_url).await.unwrap(); + + first_ws.close(None).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + target_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + r#"{"type":"subscribe","events":["emotion"]}"#.into(), + )) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let reaction = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "🤔"}, + "command": "add_reaction" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + reaction.to_string(), + )) + .await + .unwrap(); + + let filtered = + tokio::time::timeout(std::time::Duration::from_millis(300), target_ws.next()).await; + assert!( + filtered.is_err(), + "target client received agent_state despite emotion-only subscription" + ); + + let edit = serde_json::json!({ + "schema": "openab.gateway.reply.v1", + "reply_to": "evt_1", "platform": "vtuber", + "channel": {"id": "ch_test"}, + "content": {"type": "text", "text": "[joy] yay"}, + "command": "edit_message", "request_id": "req_1" + }); + oab_ws + .send(tokio_tungstenite::tungstenite::Message::Text( + edit.to_string(), + )) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), target_ws.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let event: serde_json::Value = serde_json::from_str(&msg.to_string()).unwrap(); + assert_eq!(event["type"], "emotion"); + assert_eq!(event["tag"], "joy"); + } + // ----------------------------------------------------------------------- // Integration: ping → pong // ----------------------------------------------------------------------- From 8096e033f3e630d6cf9c2710bf0768a379e5dd05 Mon Sep 17 00:00:00 2001 From: Can Yu Date: Tue, 30 Jun 2026 11:39:52 +0800 Subject: [PATCH 10/10] fix(gateway): require vtuber streaming requests --- crates/openab-gateway/src/adapters/vtuber.rs | 64 ++++++++++++++++++-- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/crates/openab-gateway/src/adapters/vtuber.rs b/crates/openab-gateway/src/adapters/vtuber.rs index d88c12f1c..82c1a4fcf 100644 --- a/crates/openab-gateway/src/adapters/vtuber.rs +++ b/crates/openab-gateway/src/adapters/vtuber.rs @@ -617,6 +617,14 @@ pub async fn chat_completions( } } + if req.stream != Some(true) { + return ( + StatusCode::BAD_REQUEST, + "only streaming mode is supported; set stream: true", + ) + .into_response(); + } + let prompt = flatten_messages(&req.messages); if prompt.trim().is_empty() { return ( @@ -667,7 +675,13 @@ pub async fn chat_completions( } info!(channel = %channel_id, "vtuber: chat request dispatched"); - let stream = reply_stream(rx, model, channel_id, state.vtuber_pending.clone()); + let stream = reply_stream( + rx, + model, + channel_id, + state.vtuber_pending.clone(), + reply_tail_idle_timeout(), + ); Sse::new(stream) .keep_alive(KeepAlive::default()) .into_response() @@ -697,6 +711,7 @@ struct StreamState { channel_id: String, registry: ReplyRegistry, seen_snapshot: bool, + tail_idle: Duration, } fn reply_stream( @@ -704,6 +719,7 @@ fn reply_stream( model: String, channel_id: String, registry: ReplyRegistry, + tail_idle: Duration, ) -> impl Stream> { let init = StreamState { rx, @@ -715,6 +731,7 @@ fn reply_stream( channel_id, registry, seen_snapshot: false, + tail_idle, }; futures_util::stream::unfold(init, |mut s| async move { loop { @@ -732,7 +749,7 @@ fn reply_stream( } 1 => match tokio::time::timeout( if s.seen_snapshot { - reply_tail_idle_timeout() + s.tail_idle } else { REPLY_FIRST_TIMEOUT }, @@ -861,6 +878,10 @@ mod tests { let app = axum::Router::new() .route("/ws", axum::routing::get(crate::ws_handler)) .route("/v1/vtuber/ws", axum::routing::get(ws_upgrade)) + .route( + "/v1/chat/completions", + axum::routing::post(chat_completions), + ) .with_state(state); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -1189,6 +1210,40 @@ mod tests { ); } + // ----------------------------------------------------------------------- + // Integration: Tier-1 explicitly requires streaming mode. + // ----------------------------------------------------------------------- + #[tokio::test] + async fn chat_completions_rejects_non_streaming_requests() { + let (addr, _, _) = start_gateway().await; + let url = format!("http://{}/v1/chat/completions", addr); + let client = reqwest::Client::new(); + let bodies = [ + serde_json::json!({ + "model": "openab", + "messages": [{"role": "user", "content": "hello"}] + }), + serde_json::json!({ + "model": "openab", + "stream": false, + "messages": [{"role": "user", "content": "hello"}] + }), + ]; + + for body in bodies { + let resp = client + .post(&url) + .bearer_auth("test-key") + .json(&body) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let text = resp.text().await.unwrap(); + assert!(text.contains("only streaming mode is supported")); + } + } + // ----------------------------------------------------------------------- // Unit tests // ----------------------------------------------------------------------- @@ -1315,8 +1370,6 @@ mod tests { #[tokio::test] async fn reply_stream_finishes_after_snapshot_idle() { - std::env::set_var("VTUBER_REPLY_TAIL_IDLE_MS", "10"); - let (tx, rx) = mpsc::unbounded_channel::(); let registry: ReplyRegistry = Arc::new(Mutex::new(HashMap::new())); registry.lock().await.insert("ch_idle".into(), tx.clone()); @@ -1326,6 +1379,7 @@ mod tests { "openab".into(), "ch_idle".into(), registry.clone(), + Duration::from_millis(10), )); assert!( @@ -1350,7 +1404,5 @@ mod tests { !registry.lock().await.contains_key("ch_idle"), "stream completion should remove pending registry entry" ); - - std::env::remove_var("VTUBER_REPLY_TAIL_IDLE_MS"); } }