diff --git a/agents/conversation_learner/README.md b/agents/conversation_learner/README.md
index d88cb5d..4cd4b48 100644
--- a/agents/conversation_learner/README.md
+++ b/agents/conversation_learner/README.md
@@ -21,7 +21,7 @@
## Architecture & Integration
- **Trajectory Analysis**: Uses Cloud Logging to fetch recent conversational trajectories. For each conversation, the messages from every inference log entry are merged (deduplicated, in chronological order) into one transcript — earlier entries backfill any later entry whose `gen_ai.*.messages` label exceeded Cloud Logging's 64 KiB limit and was truncated.
- **Per-conversation LLM-as-a-judge**: Each conversation is judged **independently and in parallel** (a direct Vertex `generate_content` call per conversation), extracting detection signals, gaps, and `ContextEnrichmentProposal` records. This bounds each judge's context for more consistent analysis and scales to many conversations, instead of analyzing every conversation in a single pass.
-- **Cross-conversation dedup**: A lightweight aggregation pass deduplicates proposals across conversations (same asset + gap type), keeping the highest-confidence instance, before saving to `proposal.json`.
+- **Cross-conversation aggregation**: A lightweight pass merges proposals across conversations by identity (same asset + gap type) into one learning, rolling up the recurrence signal — `occurrence_count`, `source_conversation_ids`, aggregated `supporting_evidence`, `first_seen`/`last_seen`, and a recurrence-boosted `confidence_grade` (noisy-OR over instances; `max_instance_confidence` preserves the best single instance) — before saving to `proposal.json`.
## Running Locally
diff --git a/agents/conversation_learner/agent.py b/agents/conversation_learner/agent.py
index 8a7a891..6f833f9 100644
--- a/agents/conversation_learner/agent.py
+++ b/agents/conversation_learner/agent.py
@@ -527,8 +527,9 @@ def load_instruction() -> str:
# PER-CONVERSATION LLM-AS-JUDGE ORCHESTRATION
# ==============================================================================
# Each conversation is judged by its own direct generate_content call (bypassing
-# ADK), fanned out in parallel, then proposals are deduplicated across
-# conversations. This bounds each judge's context (vs. one giant pass over every
+# ADK), fanned out in parallel, then proposals sharing an identity are merged
+# across conversations (dedup + provenance roll-up + recurrence-boosted
+# confidence). This bounds each judge's context (vs. one giant pass over every
# conversation), scales, and isolates per-conversation failures. Mirrors the
# direct-call + asyncio.gather + retry pattern in agents/enrichment/src/common.py.
@@ -674,19 +675,78 @@ def _judge_conversation_sync(conversation_id: str, transcript: str) -> List[Dict
return []
+def _noisy_or(confidences: List[Optional[float]]) -> float:
+ """Combines independent per-instance confidences via noisy-OR.
+
+ ``P = 1 - Π(1 - cᵢ)``. A learning corroborated by several conversations is
+ more trustworthy than any single instance, so recurrence raises confidence
+ while staying in [0, 1]; a lone instance is returned unchanged. Note this
+ saturates toward 1 with many instances — acceptable because recurring
+ signals are exactly what we want to surface; temper it if low-quality
+ proposals start accumulating.
+ """
+ prob_none = 1.0
+ for c in confidences:
+ prob_none *= 1.0 - min(max(c or 0.0, 0.0), 1.0)
+ return 1.0 - prob_none
+
+
+def _merge_group(members: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """Merges all proposals sharing one identity into a single enriched learning.
+
+ The highest-confidence instance supplies the representative fields (asset,
+ proposed value, instruction, eval candidate). On top of that we roll up the
+ cross-conversation signal: how many conversations surfaced it, which ones,
+ all supporting evidence, the time span, and a recurrence-boosted confidence
+ (``max_instance_confidence`` preserves the best single-instance grade).
+ Provenance (``_provenance``) is attached by the orchestrator; when absent
+ (e.g. direct unit-test calls) the count falls back to the member count.
+ """
+ representative = max(members, key=lambda p: p.get("confidence_grade") or 0.0)
+ merged = {k: v for k, v in representative.items() if k != "_provenance"}
+
+ conv_ids: List[str] = []
+ timestamps: List[str] = []
+ supporting: List[Dict[str, Any]] = []
+ for p in members:
+ prov = p.get("_provenance") or {}
+ cid, ts = prov.get("conversation_id"), prov.get("timestamp")
+ if cid and cid not in conv_ids:
+ conv_ids.append(cid)
+ if ts:
+ timestamps.append(ts)
+ ev = p.get("evidence")
+ if isinstance(ev, dict):
+ supporting.append({**({"conversation_id": cid} if cid else {}), **ev})
+
+ merged["confidence_grade"] = round(
+ _noisy_or([p.get("confidence_grade") for p in members]), 4
+ )
+ merged["max_instance_confidence"] = max(
+ (p.get("confidence_grade") or 0.0) for p in members
+ )
+ merged["occurrence_count"] = len(conv_ids) if conv_ids else len(members)
+ if conv_ids:
+ merged["source_conversation_ids"] = conv_ids
+ if supporting:
+ merged["supporting_evidence"] = supporting
+ if timestamps:
+ merged["first_seen"], merged["last_seen"] = min(timestamps), max(timestamps)
+ return merged
+
+
def _aggregate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """Deduplicates proposals across conversations (lightweight aggregation pass).
+ """Merges proposals across conversations into deduplicated learnings.
- Proposals with the same identity (see _proposal_identity) are treated as the
- same learning; the highest-confidence instance is kept.
+ Proposals with the same identity (see _proposal_identity) describe the same
+ gap on the same asset and are merged into one record carrying the recurrence
+ signal — occurrence count, source conversations, aggregated evidence, time
+ span, and a recurrence-boosted confidence (see _merge_group).
"""
- by_key: Dict[Tuple, Dict[str, Any]] = {}
+ groups: Dict[Tuple, List[Dict[str, Any]]] = {}
for p in proposals:
- key = _proposal_identity(p)
- existing = by_key.get(key)
- if existing is None or (p.get("confidence_grade") or 0) > (existing.get("confidence_grade") or 0):
- by_key[key] = p
- return list(by_key.values())
+ groups.setdefault(_proposal_identity(p), []).append(p)
+ return [_merge_group(members) for members in groups.values()]
async def generate_learnings(
@@ -701,8 +761,8 @@ async def generate_learnings(
"""Analyzes agent conversation trajectories and saves enrichment proposals.
Fetches trajectories from Cloud Logging, runs the LLM-as-judge independently
- on EACH conversation in parallel, deduplicates the proposals across
- conversations, saves them to proposal.json, and returns a summary.
+ on EACH conversation in parallel, merges proposals across conversations into
+ recurrence-aware learnings, saves them to proposal.json, and returns a summary.
Args:
conversation_id: A single conversation id to analyze.
@@ -738,12 +798,19 @@ async def generate_learnings(
"No conversations with trajectories were found for the given parameters."
)
- # Render each conversation into its own transcript.
+ # Render each conversation into its own transcript, capturing its latest
+ # timestamp for provenance roll-up during aggregation.
transcripts: Dict[str, str] = {}
+ conv_ts: Dict[str, Optional[str]] = {}
for c_id, c_entries in grouped.items():
lines: List[str] = []
_render_conversation(c_entries, lines)
transcripts[c_id] = "\n".join(lines)
+ latest = max(
+ (e.timestamp for e in c_entries if getattr(e, "timestamp", None)),
+ default=None,
+ )
+ conv_ts[c_id] = latest.isoformat() if latest else None
# Fan out one judge call per conversation, concurrency-capped and retried
# independently so a single failure yields [] rather than aborting the batch.
@@ -752,13 +819,18 @@ async def generate_learnings(
async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]:
async with sem:
try:
- return await _with_retry(
+ proposals = await _with_retry(
lambda: asyncio.to_thread(_judge_conversation_sync, c_id, transcript),
what=f"judge[{c_id}]",
)
except Exception as e: # pylint: disable=broad-except
print(f"[judge] {c_id}: failed after retries: {e}", flush=True)
return []
+ # Tag each proposal with its source so aggregation can roll up the
+ # cross-conversation recurrence signal (stripped before saving).
+ for p in proposals:
+ p["_provenance"] = {"conversation_id": c_id, "timestamp": conv_ts.get(c_id)}
+ return proposals
results = await asyncio.gather(*[_judge(c, t) for c, t in transcripts.items()])
raw_proposals = [p for sub in results for p in sub]
@@ -768,11 +840,13 @@ async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]:
save_trajectory_analysis_result(json.dumps({"proposals": deduped}))
+ recurring = sum(1 for p in deduped if (p.get("occurrence_count") or 1) > 1)
return (
f"Total log entries retrieved: {len(entries)}. Unique conversations: {len(grouped)}.\n"
f"Conversation IDs: {', '.join(grouped.keys())}\n"
f"Analyzed each conversation independently; saved {len(deduped)} "
- f"deduplicated proposal(s) (from {len(raw_proposals)} raw) to proposal.json."
+ f"deduplicated proposal(s) (from {len(raw_proposals)} raw; {recurring} "
+ f"recurring across multiple conversations) to proposal.json."
)
diff --git a/agents/conversation_learner/review_app.py b/agents/conversation_learner/review_app.py
index aed024e..588ed28 100644
--- a/agents/conversation_learner/review_app.py
+++ b/agents/conversation_learner/review_app.py
@@ -72,6 +72,10 @@ def _conf(p):
return p.get("confidence_grade") or 0.0
+def _occ(p):
+ return p.get("occurrence_count") or 1
+
+
# --- load ---
proposals_path = rs.DEFAULT_PROPOSALS_PATH
if not os.path.exists(proposals_path):
@@ -102,7 +106,10 @@ def _conf(p):
f_gap = sb.multiselect("Gap type", gap_opts, default=gap_opts)
f_sig = sb.multiselect("Detection signal", sig_opts, default=sig_opts)
f_conf = sb.slider("Min confidence", 0.0, 1.0, 0.0, 0.05)
+max_occ = max((_occ(p) for p in merged), default=1)
+f_min_occ = sb.slider("Min occurrences", 1, max_occ, 1) if max_occ > 1 else 1
f_query = sb.text_input("🔎 Asset contains")
+sort_by = sb.selectbox("Sort by", ["Occurrences", "Confidence"], index=0)
sb.divider()
sb.subheader("Bulk action")
@@ -123,10 +130,17 @@ def _visible(p):
and _gap(p) in f_gap
and _signal(p) in f_sig
and _conf(p) >= f_conf
+ and _occ(p) >= f_min_occ
and f_query.lower() in _asset_name(p).lower())
-filtered = [p for p in merged if _visible(p)]
+# Sort most-recurring (then highest-confidence) first by default, so high-impact
+# learnings aren't hidden behind the MAX_CARDS cap.
+_SORT_KEYS = {
+ "Occurrences": lambda p: (_occ(p), _conf(p)),
+ "Confidence": lambda p: (_conf(p), _occ(p)),
+}
+filtered = sorted((p for p in merged if _visible(p)), key=_SORT_KEYS[sort_by], reverse=True)
tab_review, tab_analytics = st.tabs(["Review", "Analytics"])
@@ -139,9 +153,11 @@ def _visible(p):
with st.container(border=True):
head = st.columns([4, 1])
with head[0]:
+ occ = p.get("occurrence_count") or 1
st.markdown(
_badge(gap, GAP_COLORS.get(gap, "#555")) + " "
+ _badge(_signal(p), "#374151")
+ + (f" {_badge(f'recurring ×{occ}', '#7c3aed')}" if occ > 1 else "")
+ f" {(p.get('target_asset') or {}).get('type', '?')} "
+ f"· {_asset_name(p)}",
unsafe_allow_html=True,
@@ -152,10 +168,19 @@ def _visible(p):
with head[1]:
st.metric("confidence", f"{_conf(p):.2f}")
st.progress(min(max(_conf(p), 0.0), 1.0))
- with st.expander("Evidence"):
- st.write(ev.get("reasoning", ""))
- if ev.get("trajectory_quote"):
- st.code(ev["trajectory_quote"])
+ with st.expander(f"Evidence ({occ} conversation{'s' if occ != 1 else ''})"):
+ supporting = p.get("supporting_evidence") or []
+ if supporting:
+ for s in supporting:
+ cid = s.get("conversation_id")
+ if s.get("reasoning"):
+ st.write((f"**{cid}** — " if cid else "") + s["reasoning"])
+ if s.get("trajectory_quote"):
+ st.code(s["trajectory_quote"])
+ else:
+ st.write(ev.get("reasoning", ""))
+ if ev.get("trajectory_quote"):
+ st.code(ev["trajectory_quote"])
with st.expander("Enrichment instruction"):
st.write(p.get("enrichment_agent_instruction", ""))
golden_sql = (p.get("eval_candidate") or {}).get("golden_sql")
@@ -171,8 +196,9 @@ def _visible(p):
act[2].markdown(f"status: **{STATUS_BADGE.get(p['status'], p['status'])}**"
+ (f" — _{p['review_note']}_" if p.get("review_note") else ""))
if len(filtered) > MAX_CARDS:
- st.info(f"Showing the first {MAX_CARDS} of {len(filtered)}. "
- f"Refine filters or bulk-approve to narrow the queue.")
+ st.info(f"Showing the first {MAX_CARDS} of {len(filtered)} "
+ f"(sorted by {sort_by.lower()}). Refine filters (incl. min "
+ f"occurrences) or bulk-approve to narrow the queue.")
with tab_analytics:
st.subheader("By status")
diff --git a/agents/conversation_learner/tests/test_agent.py b/agents/conversation_learner/tests/test_agent.py
index a453475..180fc2f 100644
--- a/agents/conversation_learner/tests/test_agent.py
+++ b/agents/conversation_learner/tests/test_agent.py
@@ -35,6 +35,7 @@
_conversation_filter,
_format_message,
_group_by_conversation,
+ _noisy_or,
_parse_generic_payload,
_proposal_id,
_reasoning_engine_filter,
@@ -642,19 +643,37 @@ def test_groups_entries_and_skips_unlabeled(self):
class TestAggregateProposals(unittest.TestCase):
- def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5, instr="do X"):
- return {
+ def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5,
+ instr="do X", conv=None, evidence=None):
+ p = {
"classification": {"detection_signal": "DIRECT_USER_CORRECTION", "gap_type": gap},
"target_asset": {"type": atype, "name": name},
"proposed_enrichment": {"action": "UPDATE_OVERVIEW_ASPECT", "value": "v"},
"confidence_grade": conf,
"enrichment_agent_instruction": instr,
}
-
- def test_same_asset_and_gap_collapsed_keeping_higher_confidence(self):
+ if evidence is not None:
+ p["evidence"] = evidence
+ if conv is not None:
+ p["_provenance"] = {"conversation_id": conv,
+ "timestamp": f"2026-01-0{conv[-1]}T00:00:00+00:00"}
+ return p
+
+ def test_same_asset_and_gap_merged_into_one(self):
out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("DS.T.COL", conf=0.9)])
self.assertEqual(len(out), 1)
- self.assertEqual(out[0]["confidence_grade"], 0.9)
+
+ def test_merge_preserves_best_instance_and_boosts_confidence(self):
+ out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("ds.t.col", conf=0.9)])
+ # Best single-instance grade preserved; aggregate boosted above it.
+ self.assertEqual(out[0]["max_instance_confidence"], 0.9)
+ # noisy-OR(0.6, 0.9) = 1 - (0.4 * 0.1) = 0.96
+ self.assertAlmostEqual(out[0]["confidence_grade"], 0.96, places=4)
+
+ def test_single_instance_confidence_unchanged(self):
+ out = _aggregate_proposals([self._p("ds.t.col", conf=0.9)])
+ self.assertAlmostEqual(out[0]["confidence_grade"], 0.9, places=4)
+ self.assertEqual(out[0]["occurrence_count"], 1)
def test_different_gap_type_not_merged(self):
out = _aggregate_proposals([
@@ -667,6 +686,27 @@ def test_different_asset_not_merged(self):
out = _aggregate_proposals([self._p("ds.t.a"), self._p("ds.t.b")])
self.assertEqual(len(out), 2)
+ def test_provenance_rolled_up_across_conversations(self):
+ out = _aggregate_proposals([
+ self._p("ds.t.col", conf=0.6, conv="c1",
+ evidence={"reasoning": "r1", "trajectory_quote": "q1"}),
+ self._p("ds.t.col", conf=0.8, conv="c2",
+ evidence={"reasoning": "r2", "trajectory_quote": "q2"}),
+ ])
+ self.assertEqual(len(out), 1)
+ merged = out[0]
+ self.assertEqual(merged["occurrence_count"], 2)
+ self.assertEqual(merged["source_conversation_ids"], ["c1", "c2"])
+ self.assertEqual(len(merged["supporting_evidence"]), 2)
+ self.assertEqual(merged["first_seen"], "2026-01-01T00:00:00+00:00")
+ self.assertEqual(merged["last_seen"], "2026-01-02T00:00:00+00:00")
+ self.assertNotIn("_provenance", merged) # transient key stripped before saving
+
+ def test_occurrence_counts_distinct_conversations(self):
+ # Two proposals from the SAME conversation count as one occurrence.
+ out = _aggregate_proposals([self._p("ds.t.col", conv="c1"), self._p("ds.t.col", conv="c1")])
+ self.assertEqual(out[0]["occurrence_count"], 1)
+
def test_blank_name_distinct_instructions_not_collapsed(self):
out = _aggregate_proposals([
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog table A"),
@@ -674,13 +714,29 @@ def test_blank_name_distinct_instructions_not_collapsed(self):
])
self.assertEqual(len(out), 2)
- def test_blank_name_same_instruction_collapsed(self):
+ def test_blank_name_same_instruction_merged(self):
out = _aggregate_proposals([
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.4),
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.8),
])
self.assertEqual(len(out), 1)
- self.assertEqual(out[0]["confidence_grade"], 0.8)
+ self.assertEqual(out[0]["max_instance_confidence"], 0.8)
+
+
+class TestNoisyOr(unittest.TestCase):
+
+ def test_single_value_unchanged(self):
+ self.assertAlmostEqual(_noisy_or([0.7]), 0.7, places=4)
+
+ def test_recurrence_increases_confidence(self):
+ self.assertAlmostEqual(_noisy_or([0.6, 0.9]), 0.96, places=4)
+ self.assertGreater(_noisy_or([0.5, 0.5, 0.5]), 0.5)
+
+ def test_empty_is_zero(self):
+ self.assertEqual(_noisy_or([]), 0.0)
+
+ def test_none_treated_as_zero(self):
+ self.assertAlmostEqual(_noisy_or([None, 0.5]), 0.5, places=4)
# ---------------------------------------------------------------------------