diff --git a/app/chatgpt/client.py b/app/chatgpt/client.py index cd95d61..a5b4759 100644 --- a/app/chatgpt/client.py +++ b/app/chatgpt/client.py @@ -26,6 +26,7 @@ from .sse import ( SSEEvent, ChatMessage, async_parse_sse_stream, async_extract_chat_messages, async_stream_text_from_response, + strip_citation_markers, CitationStripper, ) @@ -442,7 +443,7 @@ async def chat(self, opts: ChatOptions) -> ChatResult: if msg.finish_reason: result.finish_reason = msg.finish_reason - result.content = "".join(content_parts) + result.content = strip_citation_markers("".join(content_parts)) # Estimate token usage result.prompt_tokens = estimate_messages_tokens(opts.messages) @@ -468,7 +469,14 @@ async def chat_stream(self, opts: ChatOptions) -> AsyncGenerator[ChatMessage, No ) conduit_token = await self.prepare_fchat(chat_token, proof_token, opts) + stripper = CitationStripper() async for msg in self.stream_fchat(chat_token, proof_token, conduit_token, opts): + if msg.content and not msg.is_image: + cleaned = stripper.feed(msg.content) + if not cleaned and not msg.finish_reason: + # Entire delta was citation markup; nothing to emit + continue + msg.content = cleaned yield msg # ---------- Legacy /backend-api/conversation ---------- diff --git a/app/chatgpt/sse.py b/app/chatgpt/sse.py index e743864..bc1dbdf 100644 --- a/app/chatgpt/sse.py +++ b/app/chatgpt/sse.py @@ -7,11 +7,76 @@ from __future__ import annotations import json +import re import time from dataclasses import dataclass, field from typing import Any, AsyncGenerator, Dict, Generator, Optional +# ChatGPT inline citation/navlist markers are wrapped in private-use-area +# control chars: "\ue200cite\ue202turn0finance0\ue201". They surface as stray +# "citeturn0finance0" text in API output, so strip them. We also strip the +# bare textual form in case the PUA delimiters are dropped upstream. +_CITATION_SPAN_RE = re.compile("\ue200.*?\ue201", re.S) +_CITATION_STRAY_RE = re.compile("[\ue200-\ue20f]") +_CITATION_TEXT_RE = re.compile( + r"(?:cite|navlist|video)turn\d+[a-z]+\d+(?:turn\d+[a-z]+\d+)*" +) + + +def strip_citation_markers(text: str) -> str: + """Remove ChatGPT citation/navlist markers (PUA-delimited or bare text).""" + if not text: + return text + if "\ue200" in text: + text = _CITATION_SPAN_RE.sub("", text) + text = _CITATION_STRAY_RE.sub("", text) + text = _CITATION_TEXT_RE.sub("", text) + return text + + +class CitationStripper: + """Stateful stripper for streamed deltas: drops PUA-delimited citation + markers even when a marker is split across chunks, plus the bare text form.""" + + def __init__(self) -> None: + self._in_marker = False + + def feed(self, chunk: str) -> str: + if not chunk: + return chunk + if self._in_marker or "\ue200" in chunk: + out = [] + for ch in chunk: + if self._in_marker: + if ch == "\ue201": + self._in_marker = False + continue + if ch == "\ue200": + self._in_marker = True + continue + if "\ue200" <= ch <= "\ue20f": + continue + out.append(ch) + chunk = "".join(out) + return _CITATION_TEXT_RE.sub("", chunk) + + +def _term_finish(role: str, recipient: str, content_type: str) -> str: + """Return "stop" only when the current message is the user-visible + assistant answer. A bare {"p":"/message/status","v":"finished_successfully"} + patch carries no role, and the web-search/tool message reports + finished_successfully mid-stream too -- treating that as terminal would + cut the stream off before the answer is generated.""" + if role not in ("assistant", ""): + return "" + if recipient not in ("all", ""): + return "" + if content_type not in ("text", "multimodal_text", ""): + return "" + return "stop" + + @dataclass class SSEEvent: event: str = "" @@ -83,6 +148,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[ with full-message updates and JSON Patch incremental updates. """ accumulators: Dict[str, Dict[str, Any]] = {} + cur_role = cur_recipient = cur_ct = "" for event in events: if event.error: @@ -123,6 +189,35 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[ ) continue + # --- Batch JSON Patch with no top-level path --- + # Two forms, both used by the web-search / finance answer flow: + # {"o": "patch", "v": [{"p": "/message/content/parts/0", "o": "append", "v": "..."}]} + # {"v": [{"p": "/message/content/parts/0", "o": "append", "v": "..."}]} (continuation) + # Non-content list items (e.g. search_result_group) are filtered by the path check below. + if "p" not in data and isinstance(data.get("v"), list): + conv_id = data.get("conversation_id", "") + msg_id = data.get("message_id", "") + key = f"{conv_id}:{msg_id}" + if key not in accumulators: + accumulators[key] = {"text": "", "message_id": msg_id, "conversation_id": conv_id} + for sub_patch in data["v"]: + if not isinstance(sub_patch, dict): + continue + sub_val = sub_patch.get("v") + sub_path = sub_patch.get("p", "") + if isinstance(sub_val, str) and "/content/parts/" in sub_path: + accumulators[key]["text"] += sub_val + yield ChatMessage( + message_id=msg_id, conversation_id=conv_id, + role="assistant", content=sub_val, + ) + elif sub_path == "/message/status" and sub_val == "finished_successfully": + yield ChatMessage( + message_id=msg_id, conversation_id=conv_id, + role="assistant", content="", finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), + ) + continue + # --- JSON Patch incremental delta --- # e.g. {"p": "/message/content/parts/0", "o": "append", "v": "Hello"} # Also handles {"p": "", "o": "add", "v": {"message": ...}} from delta_encoding @@ -206,7 +301,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[ conversation_id=conv_id, role="assistant", content="", - finish_reason="stop", + finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), ) elif isinstance(patch_value, list) and data.get("o") == "patch": # Batch patch: v is a list of patch operations @@ -228,7 +323,7 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[ conversation_id=conv_id, role="assistant", content="", - finish_reason="stop", + finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), ) continue @@ -266,6 +361,11 @@ def extract_chat_messages(events: Generator[SSEEvent, None, None]) -> Generator[ message_id = msg.get("id") or data.get("message_id", "") conversation_id = data.get("conversation_id", "") role = (msg.get("author") or {}).get("role", "") + # Track current message so a later bare /message/status patch (no role) + # can tell the visible answer from a search/tool/thinking message. + cur_role = role + cur_recipient = msg.get("recipient", "") + cur_ct = (msg.get("content") or {}).get("content_type", "") # Skip non-assistant messages (user/system echoes in SSE stream) if role not in ("assistant", ""): continue @@ -398,6 +498,7 @@ async def async_parse_sse_stream(response) -> AsyncGenerator[SSEEvent, None]: async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, None]: """Async version of extract_chat_messages — iterates over async SSEEvent generator.""" accumulators: Dict[str, Dict[str, Any]] = {} + cur_role = cur_recipient = cur_ct = "" async for event in events: if event.error: @@ -434,6 +535,32 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non ) continue + # Batch JSON Patch with no top-level path (web-search / finance answer flow). + # Handles both {"o":"patch","v":[...]} and the {"v":[...]} continuation form. + if "p" not in data and isinstance(data.get("v"), list): + conv_id = data.get("conversation_id", "") + msg_id = data.get("message_id", "") + key = f"{conv_id}:{msg_id}" + if key not in accumulators: + accumulators[key] = {"text": "", "message_id": msg_id, "conversation_id": conv_id} + for sub_patch in data["v"]: + if not isinstance(sub_patch, dict): + continue + sub_val = sub_patch.get("v") + sub_path = sub_patch.get("p", "") + if isinstance(sub_val, str) and "/content/parts/" in sub_path: + accumulators[key]["text"] += sub_val + yield ChatMessage( + message_id=msg_id, conversation_id=conv_id, + role="assistant", content=sub_val, + ) + elif sub_path == "/message/status" and sub_val == "finished_successfully": + yield ChatMessage( + message_id=msg_id, conversation_id=conv_id, + role="assistant", content="", finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), + ) + continue + if "p" in data and "o" in data and "v" in data: patch_path = data.get("p", "") patch_op = data.get("o", "") @@ -508,7 +635,7 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non conversation_id=conv_id, role="assistant", content="", - finish_reason="stop", + finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), ) elif isinstance(patch_value, list) and data.get("o") == "patch": for sub_patch in patch_value: @@ -529,7 +656,7 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non conversation_id=conv_id, role="assistant", content="", - finish_reason="stop", + finish_reason=_term_finish(cur_role, cur_recipient, cur_ct), ) continue @@ -563,6 +690,10 @@ async def async_extract_chat_messages(events) -> AsyncGenerator[ChatMessage, Non message_id = msg.get("id") or data.get("message_id", "") conversation_id = data.get("conversation_id", "") role = (msg.get("author") or {}).get("role", "") + # Track current message (see _term_finish) before skipping non-assistant. + cur_role = role + cur_recipient = msg.get("recipient", "") + cur_ct = (msg.get("content") or {}).get("content_type", "") if role not in ("assistant", ""): continue content_parts = (msg.get("content") or {}).get("parts", [])