Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion app/chatgpt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .sse import (
SSEEvent, ChatMessage,
async_parse_sse_stream, async_extract_chat_messages, async_stream_text_from_response,
strip_citation_markers, CitationStripper,
)


Expand Down Expand Up @@ -442,7 +443,7 @@ async def chat(self, opts: ChatOptions) -> ChatResult:
if msg.finish_reason:
result.finish_reason = msg.finish_reason

result.content = "".join(content_parts)
result.content = strip_citation_markers("".join(content_parts))

# Estimate token usage
result.prompt_tokens = estimate_messages_tokens(opts.messages)
Expand All @@ -468,7 +469,14 @@ async def chat_stream(self, opts: ChatOptions) -> AsyncGenerator[ChatMessage, No
)

conduit_token = await self.prepare_fchat(chat_token, proof_token, opts)
stripper = CitationStripper()
async for msg in self.stream_fchat(chat_token, proof_token, conduit_token, opts):
if msg.content and not msg.is_image:
cleaned = stripper.feed(msg.content)
if not cleaned and not msg.finish_reason:
# Entire delta was citation markup; nothing to emit
continue
msg.content = cleaned
yield msg

# ---------- Legacy /backend-api/conversation ----------
Expand Down
139 changes: 135 additions & 4 deletions app/chatgpt/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,76 @@
from __future__ import annotations

import json
import re
import time
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, Dict, Generator, Optional


# ChatGPT inline citation/navlist markers are wrapped in private-use-area
# control chars: "\ue200cite\ue202turn0finance0\ue201". They surface as stray
# "citeturn0finance0" text in API output, so strip them. We also strip the
# bare textual form in case the PUA delimiters are dropped upstream.
_CITATION_SPAN_RE = re.compile("\ue200.*?\ue201", re.S)
_CITATION_STRAY_RE = re.compile("[\ue200-\ue20f]")
_CITATION_TEXT_RE = re.compile(
r"(?:cite|navlist|video)turn\d+[a-z]+\d+(?:turn\d+[a-z]+\d+)*"
)


def strip_citation_markers(text: str) -> str:
"""Remove ChatGPT citation/navlist markers (PUA-delimited or bare text)."""
if not text:
return text
if "\ue200" in text:
text = _CITATION_SPAN_RE.sub("", text)
text = _CITATION_STRAY_RE.sub("", text)
text = _CITATION_TEXT_RE.sub("", text)
return text


class CitationStripper:
"""Stateful stripper for streamed deltas: drops PUA-delimited citation
markers even when a marker is split across chunks, plus the bare text form."""

def __init__(self) -> None:
self._in_marker = False

def feed(self, chunk: str) -> str:
if not chunk:
return chunk
if self._in_marker or "\ue200" in chunk:
out = []
for ch in chunk:
if self._in_marker:
if ch == "\ue201":
self._in_marker = False
continue
if ch == "\ue200":
self._in_marker = True
continue
if "\ue200" <= ch <= "\ue20f":
continue
out.append(ch)
chunk = "".join(out)
return _CITATION_TEXT_RE.sub("", chunk)
Comment on lines +38 to +62

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

裸文本引用标记在跨 chunk 分割时可能无法被清理。

CitationStripper.feed() 在字符处理后调用 _CITATION_TEXT_RE.sub("", chunk),但如果裸文本引用标记(如 citeturn0finance0)恰好被分割在两个 chunk 之间(如 chunk1="cite", chunk2="turn0finance0"),则两个 chunk 都不会匹配正则,导致泄漏。

不过根据下游代码(client.py:446),非流式场景会对聚合后的完整内容再次调用 strip_citation_markers(),可以兜底清理。如果裸文本分割确实在生产中出现,可考虑在 CitationStripper 中增加一个小的 buffer 来处理跨 chunk 的文本匹配。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/chatgpt/sse.py` around lines 38 - 62, CitationStripper.feed currently
only strips PUA markers and then runs _CITATION_TEXT_RE.sub on the incoming
chunk, which misses bare-text citation tokens split across chunks; add a small
rolling buffer on the CitationStripper instance (e.g., self._tail_buf) and in
feed prepend that buffer to the current chunk before processing with
_CITATION_TEXT_RE and the PUA logic, then after processing keep only the final N
characters (N = max possible citation token length, derived from the regex for
_CITATION_TEXT_RE) in self._tail_buf for the next call; ensure
CitationStripper.feed still returns only the cleaned output for the current
chunk (excluding the kept tail) and update/reset the buffer when markers are
closed or fully consumed, and note strip_citation_markers remains the downstream
full-text fallback.



def _term_finish(role: str, recipient: str, content_type: str) -> str:
"""Return "stop" only when the current message is the user-visible
assistant answer. A bare {"p":"/message/status","v":"finished_successfully"}
patch carries no role, and the web-search/tool message reports
finished_successfully mid-stream too -- treating that as terminal would
cut the stream off before the answer is generated."""
if role not in ("assistant", ""):
return ""
if recipient not in ("all", ""):
return ""
if content_type not in ("text", "multimodal_text", ""):
return ""
return "stop"


@dataclass
class SSEEvent:
event: str = ""
Expand Down Expand Up @@ -83,6 +148,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[
with full-message updates and JSON Patch incremental updates.
"""
accumulators: Dict[str, Dict[str, Any]] = {}
cur_role = cur_recipient = cur_ct = ""

for event in events:
if event.error:
Expand Down Expand Up @@ -123,6 +189,35 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[
)
continue

# --- Batch JSON Patch with no top-level path ---
# Two forms, both used by the web-search / finance answer flow:
# {"o": "patch", "v": [{"p": "/message/content/parts/0", "o": "append", "v": "..."}]}
# {"v": [{"p": "/message/content/parts/0", "o": "append", "v": "..."}]} (continuation)
# Non-content list items (e.g. search_result_group) are filtered by the path check below.
if "p" not in data and isinstance(data.get("v"), list):
conv_id = data.get("conversation_id", "")
msg_id = data.get("message_id", "")
key = f"{conv_id}:{msg_id}"
if key not in accumulators:
accumulators[key] = {"text": "", "message_id": msg_id, "conversation_id": conv_id}
for sub_patch in data["v"]:
if not isinstance(sub_patch, dict):
continue
sub_val = sub_patch.get("v")
sub_path = sub_patch.get("p", "")
if isinstance(sub_val, str) and "/content/parts/" in sub_path:
accumulators[key]["text"] += sub_val
yield ChatMessage(
message_id=msg_id, conversation_id=conv_id,
role="assistant", content=sub_val,
)
elif sub_path == "/message/status" and sub_val == "finished_successfully":
yield ChatMessage(
message_id=msg_id, conversation_id=conv_id,
role="assistant", content="", finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
continue
Comment on lines +192 to +219

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

批量 JSON Patch 块中的消息上下文可能与当前消息不匹配。

在批量 JSON Patch 处理块中(lines 192-219),cur_role/cur_recipient/cur_ct 用于计算 _term_finish(),但这些变量是由之前处理的完整消息更新设置的。如果批量 patch 来自不同的消息(例如在 tool 消息之后),可能会导致 _term_finish() 基于过期的上下文做出错误判断。

不过由于 _term_finish() 对空字符串默认返回 "stop",且批量 patch 通常紧随其对应的消息上下文,实际影响可能有限。如果后续发现流式场景有异常终止,可考虑从 batch patch 本身提取或验证消息身份。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/chatgpt/sse.py` around lines 192 - 219, The batch JSON Patch handling
uses stale context variables cur_role/cur_recipient/cur_ct when calling
_term_finish(), which can misattribute finish_reason for patches belonging to
other messages; update the block that yields a finished message to derive
role/recipient/ct from the patch itself (e.g., from data fields or from the
per-key accumulator entry in accumulators keyed by conv_id:msg_id) and pass
those extracted values into _term_finish() (fall back to empty/defaults only if
the patch/accumulator lacks them); reference the variables/functions
accumulators, key, data, sub_patch, _term_finish, ChatMessage and ensure the
finish_reason is computed using the patch-scoped context rather than
cur_role/cur_recipient/cur_ct.


# --- JSON Patch incremental delta ---
# e.g. {"p": "/message/content/parts/0", "o": "append", "v": "Hello"}
# Also handles {"p": "", "o": "add", "v": {"message": ...}} from delta_encoding
Expand Down Expand Up @@ -206,7 +301,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[
conversation_id=conv_id,
role="assistant",
content="",
finish_reason="stop",
finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
elif isinstance(patch_value, list) and data.get("o") == "patch":
# Batch patch: v is a list of patch operations
Expand All @@ -228,7 +323,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[
conversation_id=conv_id,
role="assistant",
content="",
finish_reason="stop",
finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
continue

Expand Down Expand Up @@ -266,6 +361,11 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[
message_id = msg.get("id") or data.get("message_id", "")
conversation_id = data.get("conversation_id", "")
role = (msg.get("author") or {}).get("role", "")
# Track current message so a later bare /message/status patch (no role)
# can tell the visible answer from a search/tool/thinking message.
cur_role = role
cur_recipient = msg.get("recipient", "")
cur_ct = (msg.get("content") or {}).get("content_type", "")
# Skip non-assistant messages (user/system echoes in SSE stream)
if role not in ("assistant", ""):
continue
Expand Down Expand Up @@ -398,6 +498,7 @@ async def async_parse_sse_stream(response) -> AsyncGenerator[SSEEvent, None]:
async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, None]:
"""Async version of extract_chat_messages — iterates over async SSEEvent generator."""
accumulators: Dict[str, Dict[str, Any]] = {}
cur_role = cur_recipient = cur_ct = ""

async for event in events:
if event.error:
Expand Down Expand Up @@ -434,6 +535,32 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non
)
continue

# Batch JSON Patch with no top-level path (web-search / finance answer flow).
# Handles both {"o":"patch","v":[...]} and the {"v":[...]} continuation form.
if "p" not in data and isinstance(data.get("v"), list):
conv_id = data.get("conversation_id", "")
msg_id = data.get("message_id", "")
key = f"{conv_id}:{msg_id}"
if key not in accumulators:
accumulators[key] = {"text": "", "message_id": msg_id, "conversation_id": conv_id}
for sub_patch in data["v"]:
if not isinstance(sub_patch, dict):
continue
sub_val = sub_patch.get("v")
sub_path = sub_patch.get("p", "")
if isinstance(sub_val, str) and "/content/parts/" in sub_path:
accumulators[key]["text"] += sub_val
yield ChatMessage(
message_id=msg_id, conversation_id=conv_id,
role="assistant", content=sub_val,
)
elif sub_path == "/message/status" and sub_val == "finished_successfully":
yield ChatMessage(
message_id=msg_id, conversation_id=conv_id,
role="assistant", content="", finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
continue

if "p" in data and "o" in data and "v" in data:
patch_path = data.get("p", "")
patch_op = data.get("o", "")
Expand Down Expand Up @@ -508,7 +635,7 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non
conversation_id=conv_id,
role="assistant",
content="",
finish_reason="stop",
finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
elif isinstance(patch_value, list) and data.get("o") == "patch":
for sub_patch in patch_value:
Expand All @@ -529,7 +656,7 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non
conversation_id=conv_id,
role="assistant",
content="",
finish_reason="stop",
finish_reason=_term_finish(cur_role, cur_recipient, cur_ct),
)
continue

Expand Down Expand Up @@ -563,6 +690,10 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non
message_id = msg.get("id") or data.get("message_id", "")
conversation_id = data.get("conversation_id", "")
role = (msg.get("author") or {}).get("role", "")
# Track current message (see _term_finish) before skipping non-assistant.
cur_role = role
cur_recipient = msg.get("recipient", "")
cur_ct = (msg.get("content") or {}).get("content_type", "")
if role not in ("assistant", ""):
continue
content_parts = (msg.get("content") or {}).get("parts", [])
Expand Down