From 59536b7750c847481a774064584866dea0b6fa65 Mon Sep 17 00:00:00 2001 From: weiyilong-1 Date: Sat, 20 Jun 2026 22:48:12 +0000 Subject: [PATCH 1/2] ConversationLearner: cross-conversation recurrence aggregation Merge per-conversation proposals that share an identity (asset type + canonical name + gap type) into one learning instead of keeping only the highest-confidence instance and discarding the rest: - Roll up the recurrence signal: occurrence_count, source_conversation_ids, aggregated supporting_evidence, and first_seen/last_seen, from each contributing conversation. Provenance is tagged at the map stage and stripped before save. - confidence_grade becomes a recurrence-boosted noisy-OR over instances; max_instance_confidence preserves the best single-instance grade. The review UI gains a 'recurring xN' badge and per-conversation evidence. Tests updated for the merge semantics + a noisy-OR suite; README updated. --- agents/conversation_learner/README.md | 2 +- agents/conversation_learner/agent.py | 106 +++++++++++++++--- agents/conversation_learner/review_app.py | 19 +++- .../conversation_learner/tests/test_agent.py | 70 ++++++++++-- 4 files changed, 169 insertions(+), 28 deletions(-) diff --git a/agents/conversation_learner/README.md b/agents/conversation_learner/README.md index d88cb5d..4cd4b48 100644 --- a/agents/conversation_learner/README.md +++ b/agents/conversation_learner/README.md @@ -21,7 +21,7 @@ ## Architecture & Integration - **Trajectory Analysis**: Uses Cloud Logging to fetch recent conversational trajectories. For each conversation, the messages from every inference log entry are merged (deduplicated, in chronological order) into one transcript — earlier entries backfill any later entry whose `gen_ai.*.messages` label exceeded Cloud Logging's 64 KiB limit and was truncated. - **Per-conversation LLM-as-a-judge**: Each conversation is judged **independently and in parallel** (a direct Vertex `generate_content` call per conversation), extracting detection signals, gaps, and `ContextEnrichmentProposal` records. This bounds each judge's context for more consistent analysis and scales to many conversations, instead of analyzing every conversation in a single pass. -- **Cross-conversation dedup**: A lightweight aggregation pass deduplicates proposals across conversations (same asset + gap type), keeping the highest-confidence instance, before saving to `proposal.json`. +- **Cross-conversation aggregation**: A lightweight pass merges proposals across conversations by identity (same asset + gap type) into one learning, rolling up the recurrence signal — `occurrence_count`, `source_conversation_ids`, aggregated `supporting_evidence`, `first_seen`/`last_seen`, and a recurrence-boosted `confidence_grade` (noisy-OR over instances; `max_instance_confidence` preserves the best single instance) — before saving to `proposal.json`. ## Running Locally diff --git a/agents/conversation_learner/agent.py b/agents/conversation_learner/agent.py index 8a7a891..6f833f9 100644 --- a/agents/conversation_learner/agent.py +++ b/agents/conversation_learner/agent.py @@ -527,8 +527,9 @@ def load_instruction() -> str: # PER-CONVERSATION LLM-AS-JUDGE ORCHESTRATION # ============================================================================== # Each conversation is judged by its own direct generate_content call (bypassing -# ADK), fanned out in parallel, then proposals are deduplicated across -# conversations. This bounds each judge's context (vs. one giant pass over every +# ADK), fanned out in parallel, then proposals sharing an identity are merged +# across conversations (dedup + provenance roll-up + recurrence-boosted +# confidence). This bounds each judge's context (vs. one giant pass over every # conversation), scales, and isolates per-conversation failures. Mirrors the # direct-call + asyncio.gather + retry pattern in agents/enrichment/src/common.py. @@ -674,19 +675,78 @@ def _judge_conversation_sync(conversation_id: str, transcript: str) -> List[Dict return [] +def _noisy_or(confidences: List[Optional[float]]) -> float: + """Combines independent per-instance confidences via noisy-OR. + + ``P = 1 - Π(1 - cᵢ)``. A learning corroborated by several conversations is + more trustworthy than any single instance, so recurrence raises confidence + while staying in [0, 1]; a lone instance is returned unchanged. Note this + saturates toward 1 with many instances — acceptable because recurring + signals are exactly what we want to surface; temper it if low-quality + proposals start accumulating. + """ + prob_none = 1.0 + for c in confidences: + prob_none *= 1.0 - min(max(c or 0.0, 0.0), 1.0) + return 1.0 - prob_none + + +def _merge_group(members: List[Dict[str, Any]]) -> Dict[str, Any]: + """Merges all proposals sharing one identity into a single enriched learning. + + The highest-confidence instance supplies the representative fields (asset, + proposed value, instruction, eval candidate). On top of that we roll up the + cross-conversation signal: how many conversations surfaced it, which ones, + all supporting evidence, the time span, and a recurrence-boosted confidence + (``max_instance_confidence`` preserves the best single-instance grade). + Provenance (``_provenance``) is attached by the orchestrator; when absent + (e.g. direct unit-test calls) the count falls back to the member count. + """ + representative = max(members, key=lambda p: p.get("confidence_grade") or 0.0) + merged = {k: v for k, v in representative.items() if k != "_provenance"} + + conv_ids: List[str] = [] + timestamps: List[str] = [] + supporting: List[Dict[str, Any]] = [] + for p in members: + prov = p.get("_provenance") or {} + cid, ts = prov.get("conversation_id"), prov.get("timestamp") + if cid and cid not in conv_ids: + conv_ids.append(cid) + if ts: + timestamps.append(ts) + ev = p.get("evidence") + if isinstance(ev, dict): + supporting.append({**({"conversation_id": cid} if cid else {}), **ev}) + + merged["confidence_grade"] = round( + _noisy_or([p.get("confidence_grade") for p in members]), 4 + ) + merged["max_instance_confidence"] = max( + (p.get("confidence_grade") or 0.0) for p in members + ) + merged["occurrence_count"] = len(conv_ids) if conv_ids else len(members) + if conv_ids: + merged["source_conversation_ids"] = conv_ids + if supporting: + merged["supporting_evidence"] = supporting + if timestamps: + merged["first_seen"], merged["last_seen"] = min(timestamps), max(timestamps) + return merged + + def _aggregate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Deduplicates proposals across conversations (lightweight aggregation pass). + """Merges proposals across conversations into deduplicated learnings. - Proposals with the same identity (see _proposal_identity) are treated as the - same learning; the highest-confidence instance is kept. + Proposals with the same identity (see _proposal_identity) describe the same + gap on the same asset and are merged into one record carrying the recurrence + signal — occurrence count, source conversations, aggregated evidence, time + span, and a recurrence-boosted confidence (see _merge_group). """ - by_key: Dict[Tuple, Dict[str, Any]] = {} + groups: Dict[Tuple, List[Dict[str, Any]]] = {} for p in proposals: - key = _proposal_identity(p) - existing = by_key.get(key) - if existing is None or (p.get("confidence_grade") or 0) > (existing.get("confidence_grade") or 0): - by_key[key] = p - return list(by_key.values()) + groups.setdefault(_proposal_identity(p), []).append(p) + return [_merge_group(members) for members in groups.values()] async def generate_learnings( @@ -701,8 +761,8 @@ async def generate_learnings( """Analyzes agent conversation trajectories and saves enrichment proposals. Fetches trajectories from Cloud Logging, runs the LLM-as-judge independently - on EACH conversation in parallel, deduplicates the proposals across - conversations, saves them to proposal.json, and returns a summary. + on EACH conversation in parallel, merges proposals across conversations into + recurrence-aware learnings, saves them to proposal.json, and returns a summary. Args: conversation_id: A single conversation id to analyze. @@ -738,12 +798,19 @@ async def generate_learnings( "No conversations with trajectories were found for the given parameters." ) - # Render each conversation into its own transcript. + # Render each conversation into its own transcript, capturing its latest + # timestamp for provenance roll-up during aggregation. transcripts: Dict[str, str] = {} + conv_ts: Dict[str, Optional[str]] = {} for c_id, c_entries in grouped.items(): lines: List[str] = [] _render_conversation(c_entries, lines) transcripts[c_id] = "\n".join(lines) + latest = max( + (e.timestamp for e in c_entries if getattr(e, "timestamp", None)), + default=None, + ) + conv_ts[c_id] = latest.isoformat() if latest else None # Fan out one judge call per conversation, concurrency-capped and retried # independently so a single failure yields [] rather than aborting the batch. @@ -752,13 +819,18 @@ async def generate_learnings( async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]: async with sem: try: - return await _with_retry( + proposals = await _with_retry( lambda: asyncio.to_thread(_judge_conversation_sync, c_id, transcript), what=f"judge[{c_id}]", ) except Exception as e: # pylint: disable=broad-except print(f"[judge] {c_id}: failed after retries: {e}", flush=True) return [] + # Tag each proposal with its source so aggregation can roll up the + # cross-conversation recurrence signal (stripped before saving). + for p in proposals: + p["_provenance"] = {"conversation_id": c_id, "timestamp": conv_ts.get(c_id)} + return proposals results = await asyncio.gather(*[_judge(c, t) for c, t in transcripts.items()]) raw_proposals = [p for sub in results for p in sub] @@ -768,11 +840,13 @@ async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]: save_trajectory_analysis_result(json.dumps({"proposals": deduped})) + recurring = sum(1 for p in deduped if (p.get("occurrence_count") or 1) > 1) return ( f"Total log entries retrieved: {len(entries)}. Unique conversations: {len(grouped)}.\n" f"Conversation IDs: {', '.join(grouped.keys())}\n" f"Analyzed each conversation independently; saved {len(deduped)} " - f"deduplicated proposal(s) (from {len(raw_proposals)} raw) to proposal.json." + f"deduplicated proposal(s) (from {len(raw_proposals)} raw; {recurring} " + f"recurring across multiple conversations) to proposal.json." ) diff --git a/agents/conversation_learner/review_app.py b/agents/conversation_learner/review_app.py index aed024e..2b33f5d 100644 --- a/agents/conversation_learner/review_app.py +++ b/agents/conversation_learner/review_app.py @@ -139,9 +139,11 @@ def _visible(p): with st.container(border=True): head = st.columns([4, 1]) with head[0]: + occ = p.get("occurrence_count") or 1 st.markdown( _badge(gap, GAP_COLORS.get(gap, "#555")) + " " + _badge(_signal(p), "#374151") + + (f" {_badge(f'recurring ×{occ}', '#7c3aed')}" if occ > 1 else "") + f"   {(p.get('target_asset') or {}).get('type', '?')} " + f"· {_asset_name(p)}", unsafe_allow_html=True, @@ -152,10 +154,19 @@ def _visible(p): with head[1]: st.metric("confidence", f"{_conf(p):.2f}") st.progress(min(max(_conf(p), 0.0), 1.0)) - with st.expander("Evidence"): - st.write(ev.get("reasoning", "")) - if ev.get("trajectory_quote"): - st.code(ev["trajectory_quote"]) + with st.expander(f"Evidence ({occ} conversation{'s' if occ != 1 else ''})"): + supporting = p.get("supporting_evidence") or [] + if supporting: + for s in supporting: + cid = s.get("conversation_id") + if s.get("reasoning"): + st.write((f"**{cid}** — " if cid else "") + s["reasoning"]) + if s.get("trajectory_quote"): + st.code(s["trajectory_quote"]) + else: + st.write(ev.get("reasoning", "")) + if ev.get("trajectory_quote"): + st.code(ev["trajectory_quote"]) with st.expander("Enrichment instruction"): st.write(p.get("enrichment_agent_instruction", "")) golden_sql = (p.get("eval_candidate") or {}).get("golden_sql") diff --git a/agents/conversation_learner/tests/test_agent.py b/agents/conversation_learner/tests/test_agent.py index a453475..180fc2f 100644 --- a/agents/conversation_learner/tests/test_agent.py +++ b/agents/conversation_learner/tests/test_agent.py @@ -35,6 +35,7 @@ _conversation_filter, _format_message, _group_by_conversation, + _noisy_or, _parse_generic_payload, _proposal_id, _reasoning_engine_filter, @@ -642,19 +643,37 @@ def test_groups_entries_and_skips_unlabeled(self): class TestAggregateProposals(unittest.TestCase): - def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5, instr="do X"): - return { + def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5, + instr="do X", conv=None, evidence=None): + p = { "classification": {"detection_signal": "DIRECT_USER_CORRECTION", "gap_type": gap}, "target_asset": {"type": atype, "name": name}, "proposed_enrichment": {"action": "UPDATE_OVERVIEW_ASPECT", "value": "v"}, "confidence_grade": conf, "enrichment_agent_instruction": instr, } - - def test_same_asset_and_gap_collapsed_keeping_higher_confidence(self): + if evidence is not None: + p["evidence"] = evidence + if conv is not None: + p["_provenance"] = {"conversation_id": conv, + "timestamp": f"2026-01-0{conv[-1]}T00:00:00+00:00"} + return p + + def test_same_asset_and_gap_merged_into_one(self): out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("DS.T.COL", conf=0.9)]) self.assertEqual(len(out), 1) - self.assertEqual(out[0]["confidence_grade"], 0.9) + + def test_merge_preserves_best_instance_and_boosts_confidence(self): + out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("ds.t.col", conf=0.9)]) + # Best single-instance grade preserved; aggregate boosted above it. + self.assertEqual(out[0]["max_instance_confidence"], 0.9) + # noisy-OR(0.6, 0.9) = 1 - (0.4 * 0.1) = 0.96 + self.assertAlmostEqual(out[0]["confidence_grade"], 0.96, places=4) + + def test_single_instance_confidence_unchanged(self): + out = _aggregate_proposals([self._p("ds.t.col", conf=0.9)]) + self.assertAlmostEqual(out[0]["confidence_grade"], 0.9, places=4) + self.assertEqual(out[0]["occurrence_count"], 1) def test_different_gap_type_not_merged(self): out = _aggregate_proposals([ @@ -667,6 +686,27 @@ def test_different_asset_not_merged(self): out = _aggregate_proposals([self._p("ds.t.a"), self._p("ds.t.b")]) self.assertEqual(len(out), 2) + def test_provenance_rolled_up_across_conversations(self): + out = _aggregate_proposals([ + self._p("ds.t.col", conf=0.6, conv="c1", + evidence={"reasoning": "r1", "trajectory_quote": "q1"}), + self._p("ds.t.col", conf=0.8, conv="c2", + evidence={"reasoning": "r2", "trajectory_quote": "q2"}), + ]) + self.assertEqual(len(out), 1) + merged = out[0] + self.assertEqual(merged["occurrence_count"], 2) + self.assertEqual(merged["source_conversation_ids"], ["c1", "c2"]) + self.assertEqual(len(merged["supporting_evidence"]), 2) + self.assertEqual(merged["first_seen"], "2026-01-01T00:00:00+00:00") + self.assertEqual(merged["last_seen"], "2026-01-02T00:00:00+00:00") + self.assertNotIn("_provenance", merged) # transient key stripped before saving + + def test_occurrence_counts_distinct_conversations(self): + # Two proposals from the SAME conversation count as one occurrence. + out = _aggregate_proposals([self._p("ds.t.col", conv="c1"), self._p("ds.t.col", conv="c1")]) + self.assertEqual(out[0]["occurrence_count"], 1) + def test_blank_name_distinct_instructions_not_collapsed(self): out = _aggregate_proposals([ self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog table A"), @@ -674,13 +714,29 @@ def test_blank_name_distinct_instructions_not_collapsed(self): ]) self.assertEqual(len(out), 2) - def test_blank_name_same_instruction_collapsed(self): + def test_blank_name_same_instruction_merged(self): out = _aggregate_proposals([ self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.4), self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.8), ]) self.assertEqual(len(out), 1) - self.assertEqual(out[0]["confidence_grade"], 0.8) + self.assertEqual(out[0]["max_instance_confidence"], 0.8) + + +class TestNoisyOr(unittest.TestCase): + + def test_single_value_unchanged(self): + self.assertAlmostEqual(_noisy_or([0.7]), 0.7, places=4) + + def test_recurrence_increases_confidence(self): + self.assertAlmostEqual(_noisy_or([0.6, 0.9]), 0.96, places=4) + self.assertGreater(_noisy_or([0.5, 0.5, 0.5]), 0.5) + + def test_empty_is_zero(self): + self.assertEqual(_noisy_or([]), 0.0) + + def test_none_treated_as_zero(self): + self.assertAlmostEqual(_noisy_or([None, 0.5]), 0.5, places=4) # --------------------------------------------------------------------------- From f92f621603f01251293196b2cdb5cf59df65fe9e Mon Sep 17 00:00:00 2001 From: weiyilong-1 Date: Sat, 20 Jun 2026 22:56:29 +0000 Subject: [PATCH 2/2] ConversationLearner: sort and filter review queue by occurrence count Add a 'Min occurrences' sidebar slider and a 'Sort by' selector (Occurrences by default, Confidence optional). The review queue now sorts most-recurring (then highest-confidence) first, so high-recurrence learnings surface above the 50-card cap instead of being hidden in proposal.json order, and can be filtered by a minimum occurrence threshold. --- agents/conversation_learner/review_app.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/agents/conversation_learner/review_app.py b/agents/conversation_learner/review_app.py index 2b33f5d..588ed28 100644 --- a/agents/conversation_learner/review_app.py +++ b/agents/conversation_learner/review_app.py @@ -72,6 +72,10 @@ def _conf(p): return p.get("confidence_grade") or 0.0 +def _occ(p): + return p.get("occurrence_count") or 1 + + # --- load --- proposals_path = rs.DEFAULT_PROPOSALS_PATH if not os.path.exists(proposals_path): @@ -102,7 +106,10 @@ def _conf(p): f_gap = sb.multiselect("Gap type", gap_opts, default=gap_opts) f_sig = sb.multiselect("Detection signal", sig_opts, default=sig_opts) f_conf = sb.slider("Min confidence", 0.0, 1.0, 0.0, 0.05) +max_occ = max((_occ(p) for p in merged), default=1) +f_min_occ = sb.slider("Min occurrences", 1, max_occ, 1) if max_occ > 1 else 1 f_query = sb.text_input("🔎 Asset contains") +sort_by = sb.selectbox("Sort by", ["Occurrences", "Confidence"], index=0) sb.divider() sb.subheader("Bulk action") @@ -123,10 +130,17 @@ def _visible(p): and _gap(p) in f_gap and _signal(p) in f_sig and _conf(p) >= f_conf + and _occ(p) >= f_min_occ and f_query.lower() in _asset_name(p).lower()) -filtered = [p for p in merged if _visible(p)] +# Sort most-recurring (then highest-confidence) first by default, so high-impact +# learnings aren't hidden behind the MAX_CARDS cap. +_SORT_KEYS = { + "Occurrences": lambda p: (_occ(p), _conf(p)), + "Confidence": lambda p: (_conf(p), _occ(p)), +} +filtered = sorted((p for p in merged if _visible(p)), key=_SORT_KEYS[sort_by], reverse=True) tab_review, tab_analytics = st.tabs(["Review", "Analytics"]) @@ -182,8 +196,9 @@ def _visible(p): act[2].markdown(f"status: **{STATUS_BADGE.get(p['status'], p['status'])}**" + (f" — _{p['review_note']}_" if p.get("review_note") else "")) if len(filtered) > MAX_CARDS: - st.info(f"Showing the first {MAX_CARDS} of {len(filtered)}. " - f"Refine filters or bulk-approve to narrow the queue.") + st.info(f"Showing the first {MAX_CARDS} of {len(filtered)} " + f"(sorted by {sort_by.lower()}). Refine filters (incl. min " + f"occurrences) or bulk-approve to narrow the queue.") with tab_analytics: st.subheader("By status")