diff --git a/backend/docs/translation-architecture.md b/backend/docs/translation-architecture.md new file mode 100644 index 0000000000..41ad2219c3 --- /dev/null +++ b/backend/docs/translation-architecture.md @@ -0,0 +1,89 @@ +# ADR-001: Full-Text vs Delta Translation in Streaming Coordinator + +## Status + +**Accepted** (2026-03-31, implicit — original implementation) +**Superseded By:** DD-008 improvements (PR #28, 2026-06-14) + +## Context + +Real-time translation during speech-to-text (STT) produces evolving text. As Deepgram streams transcript segments, each segment's text grows incrementally: + +``` +"Hola" → "Hola como" → "Hola como estas" → "Hola como estas bien" +``` + +The `TranslationCoordinator` must decide what to send to the Google Translate V3 API on each update cycle. + +## Decision + +Send **full segment text** (not just the delta/new portion) to the batch translator. + +### Rationale + +1. **Translation quality**: Google Translate V3's NMT model produces better output for complete sentences than for fragments. Full context enables: + - Correct gender agreement (`"Las estudiantes son inteligentes"` → `"The students are intelligent"`, not `"intelligent"` losing feminine) + - Idiom recognition (`"Estoy de acuerdo"` → `"I agree"`, not `"acuerdo"` → `"agreement"`) + - Proper word ordering in language pairs with different SVO structures + +2. **Assembly simplicity**: The `assembled_translation` stored in `SegmentState` IS the final persisted result — it must be high quality since it's written to Firestore and displayed to users. No stitching logic needed. + +3. **Stability gates filter most fragments**: Text only reaches the TRANSLATE gate when it has sentence-ending punctuation, a speaker switch, >700ms silence, STT `is_final` signal, or ≥12 tokens open for ≥3 seconds. By the time text reaches Google, it's usually a complete clause. + +## Consequences + +### Positive + +| Aspect | Impact | +|--------|--------| +| Translation quality | High — full NMT context for all persisted results | +| Code simplicity | Single-phase architecture, no delta-stitching logic | +| Debuggability | Each translation is self-contained; easy to inspect | +| Cache correctness | Full-text cache entries are always valid as-is | + +### Negative + +| Aspect | Impact | Dollar Cost | +|--------|--------|-------------| +| Redundant API calls | Evolving text generates unique MD5 cache keys at every step | ~$1,700–2,350/mo avoidable | +| Cache hit rate depression | Identical sentences across different segments don't dedup | Current: ~9% effective hit rate | +| Firestore write amplification | Each intermediate translation overwrites previous one | ~5x writes per stabilized segment | +| Character throughput | 284M chars/mo vs ~120–170M chars/mo potential | $4,282/mo vs ~$1,900–2,500/mo target | + +## Alternatives Considered + +### Alternative A: Delta Text + Stitching +Send only `new_text` (the uncommitted portion) to the API. Stitch onto previous `assembled_translation`. + +**Rejected initially** because: +- Fragment quality risk (see Rationale #1 above) +- Complex stitching logic needed for STT backtracking +- Would need to detect when STT revises earlier words + +**Re-evaluated in DD-008** as Phase 2 (future work) with streaming best-effort + finalization quality guarantee. + +### Alternative B: Two-Phase Architecture (Streaming + Finalization) +- **Streaming phase**: Send deltas for real-time UX (best-effort quality) +- **Finalization phase**: On stability signal, send full text split into sentences with per-sentence caching + +**Selected as future direction** (DD-008 Phase 2). Not implemented yet because it requires significant architectural changes and STT backtracking handling. + +### Alternative C: Sentence-Level Dedup Only (CHOSEN — This PR) +Keep sending full text, but split into sentences before cache lookup. Preserves full-sentence translation quality while eliminating redundant translations of identical sentences across segments/users. + +**Implemented in PR #28** alongside merge-aware Redis lookup. + +## This PR's Changes (DD-008 Phase 1 + 3) + +| Change | File | What | Savings | +|--------|------|------|---------| +| Sentence-level dedup | `translation.py:translate_units_batch()` | Split texts → per-sentence cache check → reassemble | $1,070–$1,500/mo | +| Merge-aware Redis lookup | `translation_coordinator.py:199-214` | On prefix reset, check Redis before re-translating | $430–640/mo | +| Design-decision comments | `translation_coordinator.py` | Document why full text is sent, trade-offs, future path | $0 (documentation) | + +## References + +- DD-008 deep dive: `deep-dives/DD-008-translate-dedup-gap.md` +- DD-008 design review: `deep-dives/DD-008-design-review.md` +- Original implementation: commit `fd25ede51` (PR #6155/#6178 by Thinh, 2026-03-31) +- Google Cloud Translate V3 API: https://cloud.google.com/translate/docs/basic/translating-text diff --git a/backend/utils/translation.py b/backend/utils/translation.py index 6a209f6cc7..9f21eac2dd 100644 --- a/backend/utils/translation.py +++ b/backend/utils/translation.py @@ -530,60 +530,76 @@ def translate_text_by_sentence(self, dest_language: str, text: str) -> Tuple[str def translate_units_batch(self, dest_language: str, units: List[Tuple[str, str]]) -> List[Tuple[str, str, str]]: """Translate a batch of (unit_id, text) pairs in minimal GCP API calls. - Deduplicates identical texts, checks all cache layers, and batches - only truly uncached texts into a single API call. + Splits each text into sentences, checks all cache layers PER SENTENCE, + batches only truly uncached sentences into a single API call, then + reassembles per-unit results. + + This sentence-level dedup means that if two different units share + a common sentence (e.g., "How are you?"), only the first occurrence + triggers an API call — subsequent units get the cached result. Returns list of (unit_id, translated_text, detected_lang) in input order. """ if not units: return [] - # Build deduplicated mapping: text_hash -> (text, [indices]) - results = [None] * len(units) - hash_to_info = {} # text_hash -> {'text': str, 'indices': [int]} - - for i, (unit_id, text) in enumerate(units): - text_hash = hashlib.md5(text.encode()).hexdigest() - if text_hash not in hash_to_info: - hash_to_info[text_hash] = {'text': text, 'indices': [], 'hash': text_hash} - hash_to_info[text_hash]['indices'].append(i) - - # Phase 1: Check caches for each unique text - uncached_hashes = [] - for text_hash, info in hash_to_info.items(): + # Phase 0: Split each unit's text into sentences + # unit_sentences[i] = list of (sentence_text, sentence_hash) for unit i + unit_sentences = [] + for unit_id, text in units: + sentences = split_into_sentences(text) + hashed = [(s, hashlib.md5(s.encode()).hexdigest()) for s in sentences] + unit_sentences.append((unit_id, text, hashed)) + + # Build global sentence-level dedup map: + # sent_hash -> {'text': str, 'indices': [(unit_idx, sent_idx), ...]} + sent_hash_to_info = {} + for unit_idx, (_, _, sentences) in enumerate(unit_sentences): + for sent_idx, (sent_text, sent_hash) in enumerate(sentences): + if sent_hash not in sent_hash_to_info: + sent_hash_to_info[sent_hash] = { + 'text': sent_text, + 'indices': [], + } + sent_hash_to_info[sent_hash]['indices'].append((unit_idx, sent_idx)) + + # Phase 1: Check caches for each unique sentence + # sent_translation[hash] = (translated_text, detected_lang) or None + sent_translation = {} # hash -> (str, str) + uncached_sent_hashes = [] + + for sent_hash, info in sent_hash_to_info.items(): # Check negative cache first - if get_negative_cache(text_hash, dest_language): - for idx in info['indices']: - results[idx] = (units[idx][0], info['text'], '') # return original text + if get_negative_cache(sent_hash, dest_language): + sent_translation[sent_hash] = (info['text'], '') # return original continue # Check memory cache - cached = self._check_memory_cache(text_hash, dest_language) + cached = self._check_memory_cache(sent_hash, dest_language) if cached: - for idx in info['indices']: - results[idx] = (units[idx][0], cached[0], cached[1]) + sent_translation[sent_hash] = cached continue # Check Redis cache - redis_cached = get_cached_translation(text_hash, dest_language) + redis_cached = get_cached_translation(sent_hash, dest_language) if redis_cached: translated = redis_cached["text"] detected = redis_cached.get("detected_lang", "") - self._set_memory_cache(text_hash, dest_language, translated, detected) - for idx in info['indices']: - results[idx] = (units[idx][0], translated, detected) + self._set_memory_cache(sent_hash, dest_language, translated, detected) + sent_translation[sent_hash] = (translated, detected) continue - uncached_hashes.append(text_hash) + uncached_sent_hashes.append(sent_hash) - # Phase 2: Batch translate uncached texts - if uncached_hashes: - uncached_texts = [hash_to_info[h]['text'] for h in uncached_hashes] + # Phase 2: Batch translate uncached sentences + _failed_sent_hashes: set = set() # track which sentences fell back to original text + if uncached_sent_hashes: + uncached_texts = [sent_hash_to_info[h]['text'] for h in uncached_sent_hashes] for chunk_start in range(0, len(uncached_texts), MAX_BATCH_SIZE): chunk_end = min(chunk_start + MAX_BATCH_SIZE, len(uncached_texts)) chunk = uncached_texts[chunk_start:chunk_end] - chunk_hashes = uncached_hashes[chunk_start:chunk_end] + chunk_hashes = uncached_sent_hashes[chunk_start:chunk_end] try: response = _client.translate_text( @@ -594,29 +610,57 @@ def translate_units_batch(self, dest_language: str, units: List[Tuple[str, str]] ) for j, translation in enumerate(response.translations): - text_hash = chunk_hashes[j] + sent_hash = chunk_hashes[j] translated_text = translation.translated_text detected_lang = translation.detected_language_code or "" - info = hash_to_info[text_hash] - self._set_memory_cache(text_hash, dest_language, translated_text, detected_lang) - cache_translation(text_hash, dest_language, translated_text, detected_lang) - - for idx in info['indices']: - results[idx] = (units[idx][0], translated_text, detected_lang) + self._set_memory_cache(sent_hash, dest_language, translated_text, detected_lang) + cache_translation(sent_hash, dest_language, translated_text, detected_lang) + sent_translation[sent_hash] = (translated_text, detected_lang) except Exception as e: - logger.error(f"Batch translation error: {e}") + logger.error(f"Sentence-level batch translation error: {e}") + # Mark failed sentences as fallbacks so Phase 3 does NOT + # write them to Redis (avoids poisoning cache with raw text). for h in chunk_hashes: - info = hash_to_info[h] - for idx in info['indices']: - if results[idx] is None: - results[idx] = (units[idx][0], info['text'], '') - - # Fill any remaining None results (shouldn't happen, but be safe) - for i in range(len(results)): - if results[i] is None: - results[i] = (units[i][0], units[i][1], '') + if h not in sent_translation: + sent_translation[h] = (sent_hash_to_info[h]['text'], '') + _failed_sent_hashes.add(h) + + # Phase 3: Reassemble per-unit results from sentence translations + results = [] + for unit_idx, (unit_id, original_text, sentences) in enumerate(unit_sentences): + if not sentences: + results.append((unit_id, original_text, '')) + continue + + translated_parts = [] + detected_langs = [] + for sent_text, sent_hash in sentences: + if sent_hash in sent_translation: + trans_text, det_lang = sent_translation[sent_hash] + translated_parts.append(trans_text) + if det_lang: + detected_langs.append(det_lang) + else: + # Fallback: should not happen, but use original text + translated_parts.append(sent_text) + + assembled = ' '.join(translated_parts) + # Dominant detected language from constituent sentences + dominant_lang = '' + if detected_langs: + lang_counts = Counter(detected_langs) + dominant_lang = lang_counts.most_common(1)[0][0] + + text_hash = hashlib.md5(original_text.encode()).hexdigest() + # Only persist to any cache if NO sentence fell back to original text + # (avoids poisoning both in-memory LRU and Redis with untranslated output). + if not any(sh in _failed_sent_hashes for _, sh in sentences): + self._set_memory_cache(text_hash, dest_language, assembled, dominant_lang) + cache_translation(text_hash, dest_language, assembled, dominant_lang) + + results.append((unit_id, assembled, dominant_lang)) return results diff --git a/backend/utils/translation_coordinator.py b/backend/utils/translation_coordinator.py index f4e0f7b7ad..8d389408bd 100644 --- a/backend/utils/translation_coordinator.py +++ b/backend/utils/translation_coordinator.py @@ -21,6 +21,7 @@ from utils.translation import ( TranslationNeed, classify_translation_need, + get_cached_translation, set_negative_cache, TranslationService, ) @@ -97,16 +98,30 @@ def _compute_stability_signals( class TranslationCoordinator: """Orchestrates real-time translation for a single WebSocket session. - Usage in transcribe.py: - coordinator = TranslationCoordinator( - target_language='en', - translation_service=translation_service, - on_translation_ready=callback, - ) - # On each segment update: - await coordinator.observe(updated_segments, removed_ids, conversation_id) - # On session close: - await coordinator.flush() + ## Architecture + + This coordinator implements SINGLE-PHASE translation: every stable text + update is sent in full to the batch translator, which calls Google + Translate V3 with the complete segment text. See DD-008 design doc + (`deep-dives/DD-008-design-review.md`) for the planned TWO-PHASE + architecture (streaming deltas + final full-sentence translation). + + ## Data Flow + + observe() → [stability gates] → batch_buffer → _flush_batch() + → translate_units_batch() → [LRU → Redis → API] + → on_translation_ready() → Firestore persist + WebSocket push + + ## Cost Note + + Because we send full text (not delta), each evolving segment generates + multiple translations of overlapping content. Current cost: ~$4,282/mo + for 284M characters. Target (with DD-008 fixes): ~$1,900–2,500/mo. + + ## Key Trade-off + + Translation quality (full context) vs cost (redundant chars). + Currently optimized for quality. See DD-008 for path to both. """ def __init__( @@ -184,10 +199,44 @@ async def observe( # Prefix-safe check: if prefix changed, reset if state.committed_text and not text.startswith(state.committed_text): - state.committed_text = '' - state.assembled_translation = None - state.detected_lang = None - self.metrics['prefix_resets'] += 1 + # Check if the new merged text was already translated (Redis cache) + text_hash = hashlib.md5(text.encode()).hexdigest() + redis_cached = await run_blocking( + sync_executor, get_cached_translation, text_hash, self.target_language + ) + if redis_cached: + # Found in Redis — adopt as committed, skip re-translation + translated_text = redis_cached['text'] + detected_lang = redis_cached.get('detected_lang', '') + # Guard: skip no-op "translations" that would spam UI badges. + if not should_persist_translation(text, translated_text, detected_lang, self.target_language): + state.committed_text = text + state.assembled_translation = translated_text + state.detected_lang = detected_lang + state.version = self._next_version() + state.latest_text = text + state.last_update_at = now + self._batch_buffer = [entry for entry in self._batch_buffer if entry[0] != segment.id] + self.metrics['prefix_resets'] += 1 + continue # Don't add to batch buffer + state.committed_text = text + state.assembled_translation = translated_text + state.detected_lang = detected_lang + # Bump version so any in-flight batch job is rejected as stale. + state.version = self._next_version() + self.metrics['prefix_resets'] += 1 + # Invalidate any pending batch entries for this segment so + # _flush_batch() does not overwrite with stale buffered text. + self._batch_buffer = [entry for entry in self._batch_buffer if entry[0] != segment.id] + state.latest_text = text + state.last_update_at = now + await self.on_translation_ready(segment.id, translated_text, detected_lang, conversation_id) + continue # Don't add to batch buffer + else: + state.committed_text = '' + state.assembled_translation = None + state.detected_lang = None + self.metrics['prefix_resets'] += 1 # Only translate the new (uncommitted) portion new_text = text[len(state.committed_text) :].strip() if state.committed_text else text @@ -242,6 +291,27 @@ async def observe( self.metrics['classify_translates'] += 1 version = self._next_version() state.version = version + + # DESIGN DECISION: We send `text` (full segment text) instead of + # `new_text` (the uncommitted delta) to the batch translator. + # + # Rationale: + # - Google Translate V3 translates each content string independently; + # full sentence context improves disambiguation (gender agreement, + # idioms like "estoy de acuerdo" → "I agree", not "acuerdo" → "agreement") + # - The assembled_translation IS the final persisted result — it must be + # high quality since it's stored in Firestore and displayed to users + # + # Trade-off: This means evolving text ("Hola" → "Hola como" → "Hola como estas") + # generates unique MD5 cache keys at every step, causing 3–4x redundant + # translations per stabilized segment. See DD-008 for cost analysis and + # proposed two-phase architecture that preserves quality while reducing cost. + # + # If you change this to send new_text (delta), you MUST also: + # 1. Update assembly stitching logic in _flush_batch() + # 2. Ensure stability gates filter out sub-sentence fragments + # 3. Update the cache key strategy + # 4. Measure translation quality regression in production self._batch_buffer.append((segment.id, text, conversation_id, version)) # (Re)start batch aggregation timer