Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions backend/database/firestore_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Conservative Redis read-through cache for Firestore projections.

The cache is intentionally projection-oriented. Do not use it to cache whole
``users/{uid}`` documents: user docs mix low-risk preferences with entitlement,
BYOK, privacy consent, and data-protection fields that require stricter
correctness guarantees.
"""

import base64
import json
import logging
import os
import random
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Optional, cast

from database.redis_db import r
from database.firestore_cache_metrics import observe_fetch, observe_payload, record_request

logger = logging.getLogger(__name__)

_GLOBAL_VERSION = os.getenv('FIRESTORE_CACHE_GLOBAL_VERSION', '1')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Global version captured at import time

_GLOBAL_VERSION is read once when the module is imported. The architecture docs advertise FIRESTORE_CACHE_GLOBAL_VERSION=2 as a "runtime emergency rollback", but in practice it takes effect only after a full process restart (pod redeploy). By contrast, is_enabled() reads its env vars on every call, so the FIRESTORE_CACHE_ENABLED kill-switch works without a restart. This asymmetry is not documented in the rollback section, and an operator following the playbook during an incident could reasonably believe changing the env var is sufficient without a redeploy.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!



@dataclass(frozen=True)
class CachePolicy:
"""Policy for one typed Firestore cache namespace."""

namespace: str
version: int = 1
ttl_seconds: int = 60
jitter_ratio: float = 0.10
enabled_env_var: str = 'FIRESTORE_CACHE_ENABLED'
max_payload_bytes: int = 256_000


def is_enabled(policy: CachePolicy) -> bool:
"""Return whether cache reads/writes are enabled for this policy.

Global flag defaults to false. A per-namespace override can also enable or
disable a single policy, e.g. FIRESTORE_CACHE_USER_LANGUAGE_ENABLED=true.
"""

namespace_flag = f"FIRESTORE_CACHE_{policy.namespace.upper()}_ENABLED"
namespace_value = os.getenv(namespace_flag)
if namespace_value is not None:
return namespace_value.lower() in {'1', 'true', 'yes', 'on'}
return os.getenv(policy.enabled_env_var, '').lower() in {'1', 'true', 'yes', 'on'}


def make_cache_key(policy: CachePolicy, entity_id: str) -> str:
"""Build a deterministic, versioned key for a typed projection.

Entity IDs are base64url encoded instead of sanitized with string
replacement so cache keys are collision-free. For example, ``a:b`` and
``a_b`` must not map to the same Redis key because this cache can hold
per-user projections.
"""

encoded_id = base64.urlsafe_b64encode(str(entity_id).encode('utf-8')).decode('ascii').rstrip('=')
return f'fs:v{_GLOBAL_VERSION}:{policy.namespace}:v{policy.version}:b64:{encoded_id}'


def invalidate(policy: CachePolicy, entity_id: str) -> None:
"""Best-effort invalidation. Redis failures are logged and swallowed."""

key = make_cache_key(policy, entity_id)
try:
r.delete(key)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip Redis invalidation when cache is disabled

When FIRESTORE_CACHE_ENABLED is false (the default and documented rollback path), the new setters still call invalidate(), which synchronously contacts Redis via r.delete. In environments rolling the cache back or running without a healthy Redis, language/transcription/profile writes now wait on Redis connection failure even though cache reads/writes are supposed to be disabled; gate this path with is_enabled(policy) or otherwise avoid touching Redis while the cache is off.

Useful? React with 👍 / 👎.

record_request(policy.namespace, 'invalidate')
except Exception as e:
logger.warning('Firestore cache invalidate failed namespace=%s error=%s', policy.namespace, e)
record_request(policy.namespace, 'invalidate_error')


def get_or_fetch(policy: CachePolicy, entity_id: str, fetch_fn: Callable[[], Any]) -> Any:
"""Return cached projection or call ``fetch_fn`` and populate Redis.

Correctness source remains Firestore. If cache is disabled, Redis is down,
the cached value is malformed/stale, or serialization fails, this function
falls back to ``fetch_fn`` and returns its result.
"""

if not is_enabled(policy):
record_request(policy.namespace, 'disabled')
return _fetch(policy, fetch_fn)

key = make_cache_key(policy, entity_id)
now = time.time()

try:
raw = r.get(key)
except Exception as e:
logger.warning('Firestore cache read failed namespace=%s error=%s', policy.namespace, e)
record_request(policy.namespace, 'redis_error')
return _fetch(policy, fetch_fn)

if raw:
try:
raw_str = raw.decode('utf-8') if isinstance(raw, bytes) else cast(str, raw)
envelope = json.loads(raw_str, object_hook=_json_object_hook)
if envelope.get('v') == policy.version and envelope.get('fresh_until', 0) >= now:
record_request(policy.namespace, 'hit')
return envelope.get('payload')
record_request(policy.namespace, 'stale')
except Exception as e:
logger.warning('Firestore cache decode failed namespace=%s error=%s', policy.namespace, e)
record_request(policy.namespace, 'decode_error')
else:
record_request(policy.namespace, 'miss')

payload = _fetch(policy, fetch_fn)
_set(policy, key, payload, now)

@cubic-dev-ai cubic-dev-ai Bot Jun 18, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Cache freshness is timestamped from pre-fetch time, so slow fetches reduce effective TTL and can make newly written cache entries stale early. This increases read amplification and undermines the cache hit rate.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/database/firestore_cache.py, line 115:

<comment>Cache freshness is timestamped from pre-fetch time, so slow fetches reduce effective TTL and can make newly written cache entries stale early. This increases read amplification and undermines the cache hit rate.</comment>

<file context>
@@ -0,0 +1,169 @@
+        record_request(policy.namespace, 'miss')
+
+    payload = _fetch(policy, fetch_fn)
+    _set(policy, key, payload, now)
+    return payload
+
</file context>
Fix with cubic

return payload


def _fetch(policy: CachePolicy, fetch_fn: Callable[[], Any]) -> Any:
start = time.monotonic()
try:
return fetch_fn()
finally:
observe_fetch(policy.namespace, time.monotonic() - start)


def _ttl_with_jitter(policy: CachePolicy) -> int:
ttl = max(1, policy.ttl_seconds)
if policy.jitter_ratio <= 0:
return ttl
spread = max(1, int(ttl * policy.jitter_ratio))
return max(1, ttl + random.randint(-spread, spread))


def _set(policy: CachePolicy, key: str, payload: Any, now: Optional[float] = None) -> None:
now = now or time.time()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 now = now or time.time() is a falsy-value check on an Optional[float]. A timestamp of exactly 0.0 (January 1, 1970 UTC) would incorrectly be treated as None and replaced with the current time, corrupting fresh_until. Use an explicit is None guard to match the Optional[float] contract.

Suggested change
now = now or time.time()
now = now if now is not None else time.time()

ttl = _ttl_with_jitter(policy)
envelope = {
'v': policy.version,
'kind': 'value',
'created_at': now,
'fresh_until': now + ttl,
'payload': payload,
}

try:
encoded = json.dumps(envelope, default=_json_default)
payload_bytes = len(encoded.encode('utf-8'))
observe_payload(policy.namespace, payload_bytes)
if payload_bytes > policy.max_payload_bytes:
record_request(policy.namespace, 'payload_too_large')
return
r.set(key, encoded, ex=ttl)
record_request(policy.namespace, 'set')
except Exception as e:
logger.warning('Firestore cache set failed namespace=%s error=%s', policy.namespace, e)
record_request(policy.namespace, 'set_error')


def _json_default(value: Any) -> Any:
if isinstance(value, datetime):
return {'__firestore_cache_type__': 'datetime', 'iso': value.isoformat()}
raise TypeError(f'Object of type {type(value).__name__} is not JSON serializable')


def _json_object_hook(value: dict) -> Any:
if value.get('__firestore_cache_type__') == 'datetime':
return datetime.fromisoformat(value['iso'])
return value
40 changes: 40 additions & 0 deletions backend/database/firestore_cache_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Low-cardinality metrics for Firestore read-through caches.

This module intentionally lives under ``database/`` so database modules can
record metrics without importing upward from ``utils``. ``prometheus_client``
uses a global registry, so these metrics are exported automatically by the
existing /metrics endpoint.
"""

from prometheus_client import Counter, Histogram

FIRESTORE_CACHE_REQUESTS = Counter(
'firestore_cache_requests_total',
'Firestore cache requests by namespace and result',
['namespace', 'result'],
)

FIRESTORE_CACHE_FETCH_SECONDS = Histogram(
'firestore_cache_fetch_seconds',
'Time spent fetching Firestore cache misses from the source of truth',
['namespace'],
)

FIRESTORE_CACHE_PAYLOAD_BYTES = Histogram(
'firestore_cache_payload_bytes',
'Serialized Firestore cache payload size in bytes',
['namespace'],
buckets=(128, 512, 1024, 4096, 16384, 65536, 262144, 1048576),
)


def record_request(namespace: str, result: str) -> None:
FIRESTORE_CACHE_REQUESTS.labels(namespace=namespace, result=result).inc()


def observe_fetch(namespace: str, seconds: float) -> None:
FIRESTORE_CACHE_FETCH_SECONDS.labels(namespace=namespace).observe(seconds)


def observe_payload(namespace: str, payload_bytes: int) -> None:
FIRESTORE_CACHE_PAYLOAD_BYTES.labels(namespace=namespace).observe(payload_bytes)
96 changes: 69 additions & 27 deletions backend/database/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
from google.cloud.firestore_v1 import FieldFilter, transactional

from ._client import db, document_id_from_seed
from database.firestore_cache import CachePolicy, get_or_fetch, invalidate
from database.redis_db import try_acquire_user_platform_write_lock
from models.users import Subscription, PlanLimits, PlanType, SubscriptionStatus
from utils.subscription import get_default_basic_subscription
import logging

logger = logging.getLogger(__name__)

# Conservative low-risk user projections. Do NOT use these policies for
# entitlement, BYOK, data-protection, privacy-consent, or full user-doc caching.
_USER_LANGUAGE_CACHE = CachePolicy(namespace='user_language', version=1, ttl_seconds=300)
_USER_TRANSCRIPTION_PREFS_CACHE = CachePolicy(namespace='user_transcription_prefs', version=1, ttl_seconds=120)
_USER_AI_PROFILE_CACHE = CachePolicy(namespace='user_ai_profile', version=1, ttl_seconds=300)


# Industry-standard two-field pattern (Mixpanel / Amplitude / PostHog):
# signup_platform — set once at account creation, immutable
Expand Down Expand Up @@ -869,14 +876,28 @@ def get_user_language_preference(uid: str) -> str:
Returns:
Language code (e.g., 'en', 'vi') or empty string if not set
"""
user_ref = db.collection('users').document(uid)
user_doc = user_ref.get()

if user_doc.exists:
user_data = user_doc.to_dict()
return user_data.get('language', '')
def fetch_language():
user_ref = db.collection('users').document(uid)
user_doc = user_ref.get(['language'])

return '' # Return empty string if not set
if user_doc.exists:
user_data = user_doc.to_dict()
return user_data.get('language', '')

return '' # Return empty string if not set

# DESIGN DECISION: cache this typed user projection, not the full users/{uid} doc.
#
# Rationale:
# - Language preference is a low-risk, frequently-read setting used during
# listen startup.
# - Full user-doc caching is intentionally avoided because users/{uid} also
# contains entitlement, BYOK, privacy, and data-protection fields.
#
# Safety: cache is disabled by default, Redis failures fall back to Firestore,
# and set_user_language_preference() invalidates this namespace.
return get_or_fetch(_USER_LANGUAGE_CACHE, uid, fetch_language)


def set_user_language_preference(uid: str, language: str) -> None:
Expand All @@ -889,6 +910,8 @@ def set_user_language_preference(uid: str, language: str) -> None:
"""
user_ref = db.collection('users').document(uid)
user_ref.set({'language': language}, merge=True)
invalidate(_USER_LANGUAGE_CACHE, uid)
invalidate(_USER_TRANSCRIPTION_PREFS_CACHE, uid)


def get_user_onboarding_state(uid: str) -> dict:
Expand Down Expand Up @@ -1189,27 +1212,34 @@ def get_user_transcription_preferences(uid: str) -> dict:
Returns:
dict with 'single_language_mode' (bool), 'vocabulary' (List[str]), and 'language' (str)
"""
user_ref = db.collection('users').document(uid)
user_doc = user_ref.get()

if user_doc.exists:
user_data = user_doc.to_dict()
prefs = user_data.get('transcription_preferences', {})
def fetch_preferences():
user_ref = db.collection('users').document(uid)
user_doc = user_ref.get(['transcription_preferences', 'language'])

if user_doc.exists:
user_data = user_doc.to_dict()
prefs = user_data.get('transcription_preferences', {})
return {
'single_language_mode': prefs.get('single_language_mode', False),
'vocabulary': prefs.get('vocabulary', []),
'language': user_data.get('language', ''),
'uses_custom_stt': prefs.get('uses_custom_stt', False),
'custom_stt_since': prefs.get('custom_stt_since'),
}

return {
'single_language_mode': prefs.get('single_language_mode', False),
'vocabulary': prefs.get('vocabulary', []),
'language': user_data.get('language', ''),
'uses_custom_stt': prefs.get('uses_custom_stt', False),
'custom_stt_since': prefs.get('custom_stt_since'),
'single_language_mode': False,
'vocabulary': [],
'language': '',
'uses_custom_stt': False,
'custom_stt_since': None,
}

return {
'single_language_mode': False,
'vocabulary': [],
'language': '',
'uses_custom_stt': False,
'custom_stt_since': None,
}
# DESIGN DECISION: cache this typed user projection, not the full users/{uid} doc.
# It includes only transcription startup preferences and language. It does not
# include entitlement, BYOK, data-protection, or privacy-consent fields.
return get_or_fetch(_USER_TRANSCRIPTION_PREFS_CACHE, uid, fetch_preferences)


def get_agent_vm(uid: str) -> Optional[dict]:
Expand Down Expand Up @@ -1249,6 +1279,7 @@ def set_user_transcription_preferences(uid: str, single_language_mode: bool = No

if update_data:
user_ref.update(update_data)
invalidate(_USER_TRANSCRIPTION_PREFS_CACHE, uid)


def set_user_custom_stt_usage(uid: str, uses_custom_stt: bool) -> None:
Expand All @@ -1269,6 +1300,7 @@ def set_user_custom_stt_usage(uid: str, uses_custom_stt: bool) -> None:
update_data = {'transcription_preferences.uses_custom_stt': uses_custom_stt}
update_data['transcription_preferences.custom_stt_since'] = datetime.now(timezone.utc) if uses_custom_stt else None
user_ref.update(update_data)
invalidate(_USER_TRANSCRIPTION_PREFS_CACHE, uid)


# ============================================================================
Expand Down Expand Up @@ -1367,20 +1399,29 @@ def update_assistant_settings(uid: str, settings: dict) -> dict:
return existing


def get_ai_user_profile(uid: str) -> Optional[dict]:
def _get_ai_user_profile_from_firestore(uid: str) -> Optional[dict]:
user_ref = db.collection('users').document(uid)
doc = user_ref.get()
doc = user_ref.get(['ai_user_profile'])
if not doc.exists:
return None
return doc.to_dict().get('ai_user_profile')


def get_ai_user_profile(uid: str) -> Optional[dict]:
# DESIGN DECISION: cache only the low-risk ai_user_profile projection.
# Avoid full user-doc caching because high-risk entitlement/BYOK/privacy
# fields live on the same Firestore document.
return get_or_fetch(_USER_AI_PROFILE_CACHE, uid, lambda: _get_ai_user_profile_from_firestore(uid))


def update_ai_user_profile(
uid: str, profile_text: str = None, generated_at=None, data_sources_used: int = None
) -> dict:
"""Update AI user profile. Only writes non-None fields (partial update)."""
# Read existing profile and merge updates
existing = get_ai_user_profile(uid) or {}
# Read existing profile directly from Firestore — never from cache — because
# this is a read-modify-write path. Using a stale cached projection here
# could overwrite newer profile fields.
existing = _get_ai_user_profile_from_firestore(uid) or {}
if profile_text is not None:
existing['profile_text'] = profile_text
if generated_at is not None:
Expand All @@ -1389,4 +1430,5 @@ def update_ai_user_profile(
existing['data_sources_used'] = data_sources_used
user_ref = db.collection('users').document(uid)
user_ref.update({'ai_user_profile': existing})
invalidate(_USER_AI_PROFILE_CACHE, uid)
return existing
Loading
Loading