Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agents/conversation_learner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
## Architecture & Integration
- **Trajectory Analysis**: Uses Cloud Logging to fetch recent conversational trajectories. For each conversation, the messages from every inference log entry are merged (deduplicated, in chronological order) into one transcript — earlier entries backfill any later entry whose `gen_ai.*.messages` label exceeded Cloud Logging's 64 KiB limit and was truncated.
- **Per-conversation LLM-as-a-judge**: Each conversation is judged **independently and in parallel** (a direct Vertex `generate_content` call per conversation), extracting detection signals, gaps, and `ContextEnrichmentProposal` records. This bounds each judge's context for more consistent analysis and scales to many conversations, instead of analyzing every conversation in a single pass.
- **Cross-conversation dedup**: A lightweight aggregation pass deduplicates proposals across conversations (same asset + gap type), keeping the highest-confidence instance, before saving to `proposal.json`.
- **Cross-conversation aggregation**: A lightweight pass merges proposals across conversations by identity (same asset + gap type) into one learning, rolling up the recurrence signal — `occurrence_count`, `source_conversation_ids`, aggregated `supporting_evidence`, `first_seen`/`last_seen`, and a recurrence-boosted `confidence_grade` (noisy-OR over instances; `max_instance_confidence` preserves the best single instance) — before saving to `proposal.json`.

## Running Locally

Expand Down
106 changes: 90 additions & 16 deletions agents/conversation_learner/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,9 @@ def load_instruction() -> str:
# PER-CONVERSATION LLM-AS-JUDGE ORCHESTRATION
# ==============================================================================
# Each conversation is judged by its own direct generate_content call (bypassing
# ADK), fanned out in parallel, then proposals are deduplicated across
# conversations. This bounds each judge's context (vs. one giant pass over every
# ADK), fanned out in parallel, then proposals sharing an identity are merged
# across conversations (dedup + provenance roll-up + recurrence-boosted
# confidence). This bounds each judge's context (vs. one giant pass over every
# conversation), scales, and isolates per-conversation failures. Mirrors the
# direct-call + asyncio.gather + retry pattern in agents/enrichment/src/common.py.

Expand Down Expand Up @@ -674,19 +675,78 @@ def _judge_conversation_sync(conversation_id: str, transcript: str) -> List[Dict
return []


def _noisy_or(confidences: List[Optional[float]]) -> float:
"""Combines independent per-instance confidences via noisy-OR.

``P = 1 - Π(1 - cᵢ)``. A learning corroborated by several conversations is
more trustworthy than any single instance, so recurrence raises confidence
while staying in [0, 1]; a lone instance is returned unchanged. Note this
saturates toward 1 with many instances — acceptable because recurring
signals are exactly what we want to surface; temper it if low-quality
proposals start accumulating.
"""
prob_none = 1.0
for c in confidences:
prob_none *= 1.0 - min(max(c or 0.0, 0.0), 1.0)
return 1.0 - prob_none


def _merge_group(members: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Merges all proposals sharing one identity into a single enriched learning.

The highest-confidence instance supplies the representative fields (asset,
proposed value, instruction, eval candidate). On top of that we roll up the
cross-conversation signal: how many conversations surfaced it, which ones,
all supporting evidence, the time span, and a recurrence-boosted confidence
(``max_instance_confidence`` preserves the best single-instance grade).
Provenance (``_provenance``) is attached by the orchestrator; when absent
(e.g. direct unit-test calls) the count falls back to the member count.
"""
representative = max(members, key=lambda p: p.get("confidence_grade") or 0.0)
merged = {k: v for k, v in representative.items() if k != "_provenance"}

conv_ids: List[str] = []
timestamps: List[str] = []
supporting: List[Dict[str, Any]] = []
for p in members:
prov = p.get("_provenance") or {}
cid, ts = prov.get("conversation_id"), prov.get("timestamp")
if cid and cid not in conv_ids:
conv_ids.append(cid)
if ts:
timestamps.append(ts)
ev = p.get("evidence")
if isinstance(ev, dict):
supporting.append({**({"conversation_id": cid} if cid else {}), **ev})

merged["confidence_grade"] = round(
_noisy_or([p.get("confidence_grade") for p in members]), 4
)
merged["max_instance_confidence"] = max(
(p.get("confidence_grade") or 0.0) for p in members
)
merged["occurrence_count"] = len(conv_ids) if conv_ids else len(members)
if conv_ids:
merged["source_conversation_ids"] = conv_ids
if supporting:
merged["supporting_evidence"] = supporting
if timestamps:
merged["first_seen"], merged["last_seen"] = min(timestamps), max(timestamps)
return merged


def _aggregate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Deduplicates proposals across conversations (lightweight aggregation pass).
"""Merges proposals across conversations into deduplicated learnings.

Proposals with the same identity (see _proposal_identity) are treated as the
same learning; the highest-confidence instance is kept.
Proposals with the same identity (see _proposal_identity) describe the same
gap on the same asset and are merged into one record carrying the recurrence
signal — occurrence count, source conversations, aggregated evidence, time
span, and a recurrence-boosted confidence (see _merge_group).
"""
by_key: Dict[Tuple, Dict[str, Any]] = {}
groups: Dict[Tuple, List[Dict[str, Any]]] = {}
for p in proposals:
key = _proposal_identity(p)
existing = by_key.get(key)
if existing is None or (p.get("confidence_grade") or 0) > (existing.get("confidence_grade") or 0):
by_key[key] = p
return list(by_key.values())
groups.setdefault(_proposal_identity(p), []).append(p)
return [_merge_group(members) for members in groups.values()]


async def generate_learnings(
Expand All @@ -701,8 +761,8 @@ async def generate_learnings(
"""Analyzes agent conversation trajectories and saves enrichment proposals.

Fetches trajectories from Cloud Logging, runs the LLM-as-judge independently
on EACH conversation in parallel, deduplicates the proposals across
conversations, saves them to proposal.json, and returns a summary.
on EACH conversation in parallel, merges proposals across conversations into
recurrence-aware learnings, saves them to proposal.json, and returns a summary.

Args:
conversation_id: A single conversation id to analyze.
Expand Down Expand Up @@ -738,12 +798,19 @@ async def generate_learnings(
"No conversations with trajectories were found for the given parameters."
)

# Render each conversation into its own transcript.
# Render each conversation into its own transcript, capturing its latest
# timestamp for provenance roll-up during aggregation.
transcripts: Dict[str, str] = {}
conv_ts: Dict[str, Optional[str]] = {}
for c_id, c_entries in grouped.items():
lines: List[str] = []
_render_conversation(c_entries, lines)
transcripts[c_id] = "\n".join(lines)
latest = max(
(e.timestamp for e in c_entries if getattr(e, "timestamp", None)),
default=None,
)
conv_ts[c_id] = latest.isoformat() if latest else None

# Fan out one judge call per conversation, concurrency-capped and retried
# independently so a single failure yields [] rather than aborting the batch.
Expand All @@ -752,13 +819,18 @@ async def generate_learnings(
async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]:
async with sem:
try:
return await _with_retry(
proposals = await _with_retry(
lambda: asyncio.to_thread(_judge_conversation_sync, c_id, transcript),
what=f"judge[{c_id}]",
)
except Exception as e: # pylint: disable=broad-except
print(f"[judge] {c_id}: failed after retries: {e}", flush=True)
return []
# Tag each proposal with its source so aggregation can roll up the
# cross-conversation recurrence signal (stripped before saving).
for p in proposals:
p["_provenance"] = {"conversation_id": c_id, "timestamp": conv_ts.get(c_id)}
return proposals

results = await asyncio.gather(*[_judge(c, t) for c, t in transcripts.items()])
raw_proposals = [p for sub in results for p in sub]
Expand All @@ -768,11 +840,13 @@ async def _judge(c_id: str, transcript: str) -> List[Dict[str, Any]]:

save_trajectory_analysis_result(json.dumps({"proposals": deduped}))

recurring = sum(1 for p in deduped if (p.get("occurrence_count") or 1) > 1)
return (
f"Total log entries retrieved: {len(entries)}. Unique conversations: {len(grouped)}.\n"
f"Conversation IDs: {', '.join(grouped.keys())}\n"
f"Analyzed each conversation independently; saved {len(deduped)} "
f"deduplicated proposal(s) (from {len(raw_proposals)} raw) to proposal.json."
f"deduplicated proposal(s) (from {len(raw_proposals)} raw; {recurring} "
f"recurring across multiple conversations) to proposal.json."
)


Expand Down
40 changes: 33 additions & 7 deletions agents/conversation_learner/review_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def _conf(p):
return p.get("confidence_grade") or 0.0


def _occ(p):
return p.get("occurrence_count") or 1


# --- load ---
proposals_path = rs.DEFAULT_PROPOSALS_PATH
if not os.path.exists(proposals_path):
Expand Down Expand Up @@ -102,7 +106,10 @@ def _conf(p):
f_gap = sb.multiselect("Gap type", gap_opts, default=gap_opts)
f_sig = sb.multiselect("Detection signal", sig_opts, default=sig_opts)
f_conf = sb.slider("Min confidence", 0.0, 1.0, 0.0, 0.05)
max_occ = max((_occ(p) for p in merged), default=1)
f_min_occ = sb.slider("Min occurrences", 1, max_occ, 1) if max_occ > 1 else 1
f_query = sb.text_input("🔎 Asset contains")
sort_by = sb.selectbox("Sort by", ["Occurrences", "Confidence"], index=0)

sb.divider()
sb.subheader("Bulk action")
Expand All @@ -123,10 +130,17 @@ def _visible(p):
and _gap(p) in f_gap
and _signal(p) in f_sig
and _conf(p) >= f_conf
and _occ(p) >= f_min_occ
and f_query.lower() in _asset_name(p).lower())


filtered = [p for p in merged if _visible(p)]
# Sort most-recurring (then highest-confidence) first by default, so high-impact
# learnings aren't hidden behind the MAX_CARDS cap.
_SORT_KEYS = {
"Occurrences": lambda p: (_occ(p), _conf(p)),
"Confidence": lambda p: (_conf(p), _occ(p)),
}
filtered = sorted((p for p in merged if _visible(p)), key=_SORT_KEYS[sort_by], reverse=True)

tab_review, tab_analytics = st.tabs(["Review", "Analytics"])

Expand All @@ -139,9 +153,11 @@ def _visible(p):
with st.container(border=True):
head = st.columns([4, 1])
with head[0]:
occ = p.get("occurrence_count") or 1
st.markdown(
_badge(gap, GAP_COLORS.get(gap, "#555")) + " "
+ _badge(_signal(p), "#374151")
+ (f" {_badge(f'recurring ×{occ}', '#7c3aed')}" if occ > 1 else "")
+ f" &nbsp; <b>{(p.get('target_asset') or {}).get('type', '?')}</b> "
+ f"· <code>{_asset_name(p)}</code>",
unsafe_allow_html=True,
Expand All @@ -152,10 +168,19 @@ def _visible(p):
with head[1]:
st.metric("confidence", f"{_conf(p):.2f}")
st.progress(min(max(_conf(p), 0.0), 1.0))
with st.expander("Evidence"):
st.write(ev.get("reasoning", ""))
if ev.get("trajectory_quote"):
st.code(ev["trajectory_quote"])
with st.expander(f"Evidence ({occ} conversation{'s' if occ != 1 else ''})"):
supporting = p.get("supporting_evidence") or []
if supporting:
for s in supporting:
cid = s.get("conversation_id")
if s.get("reasoning"):
st.write((f"**{cid}** — " if cid else "") + s["reasoning"])
if s.get("trajectory_quote"):
st.code(s["trajectory_quote"])
else:
st.write(ev.get("reasoning", ""))
if ev.get("trajectory_quote"):
st.code(ev["trajectory_quote"])
with st.expander("Enrichment instruction"):
st.write(p.get("enrichment_agent_instruction", ""))
golden_sql = (p.get("eval_candidate") or {}).get("golden_sql")
Expand All @@ -171,8 +196,9 @@ def _visible(p):
act[2].markdown(f"status: **{STATUS_BADGE.get(p['status'], p['status'])}**"
+ (f" — _{p['review_note']}_" if p.get("review_note") else ""))
if len(filtered) > MAX_CARDS:
st.info(f"Showing the first {MAX_CARDS} of {len(filtered)}. "
f"Refine filters or bulk-approve to narrow the queue.")
st.info(f"Showing the first {MAX_CARDS} of {len(filtered)} "
f"(sorted by {sort_by.lower()}). Refine filters (incl. min "
f"occurrences) or bulk-approve to narrow the queue.")

with tab_analytics:
st.subheader("By status")
Expand Down
70 changes: 63 additions & 7 deletions agents/conversation_learner/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
_conversation_filter,
_format_message,
_group_by_conversation,
_noisy_or,
_parse_generic_payload,
_proposal_id,
_reasoning_engine_filter,
Expand Down Expand Up @@ -642,19 +643,37 @@ def test_groups_entries_and_skips_unlabeled(self):

class TestAggregateProposals(unittest.TestCase):

def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5, instr="do X"):
return {
def _p(self, name, gap="BUSINESS_LOGIC_GAP", atype="COLUMN", conf=0.5,
instr="do X", conv=None, evidence=None):
p = {
"classification": {"detection_signal": "DIRECT_USER_CORRECTION", "gap_type": gap},
"target_asset": {"type": atype, "name": name},
"proposed_enrichment": {"action": "UPDATE_OVERVIEW_ASPECT", "value": "v"},
"confidence_grade": conf,
"enrichment_agent_instruction": instr,
}

def test_same_asset_and_gap_collapsed_keeping_higher_confidence(self):
if evidence is not None:
p["evidence"] = evidence
if conv is not None:
p["_provenance"] = {"conversation_id": conv,
"timestamp": f"2026-01-0{conv[-1]}T00:00:00+00:00"}
return p

def test_same_asset_and_gap_merged_into_one(self):
out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("DS.T.COL", conf=0.9)])
self.assertEqual(len(out), 1)
self.assertEqual(out[0]["confidence_grade"], 0.9)

def test_merge_preserves_best_instance_and_boosts_confidence(self):
out = _aggregate_proposals([self._p("ds.t.col", conf=0.6), self._p("ds.t.col", conf=0.9)])
# Best single-instance grade preserved; aggregate boosted above it.
self.assertEqual(out[0]["max_instance_confidence"], 0.9)
# noisy-OR(0.6, 0.9) = 1 - (0.4 * 0.1) = 0.96
self.assertAlmostEqual(out[0]["confidence_grade"], 0.96, places=4)

def test_single_instance_confidence_unchanged(self):
out = _aggregate_proposals([self._p("ds.t.col", conf=0.9)])
self.assertAlmostEqual(out[0]["confidence_grade"], 0.9, places=4)
self.assertEqual(out[0]["occurrence_count"], 1)

def test_different_gap_type_not_merged(self):
out = _aggregate_proposals([
Expand All @@ -667,20 +686,57 @@ def test_different_asset_not_merged(self):
out = _aggregate_proposals([self._p("ds.t.a"), self._p("ds.t.b")])
self.assertEqual(len(out), 2)

def test_provenance_rolled_up_across_conversations(self):
out = _aggregate_proposals([
self._p("ds.t.col", conf=0.6, conv="c1",
evidence={"reasoning": "r1", "trajectory_quote": "q1"}),
self._p("ds.t.col", conf=0.8, conv="c2",
evidence={"reasoning": "r2", "trajectory_quote": "q2"}),
])
self.assertEqual(len(out), 1)
merged = out[0]
self.assertEqual(merged["occurrence_count"], 2)
self.assertEqual(merged["source_conversation_ids"], ["c1", "c2"])
self.assertEqual(len(merged["supporting_evidence"]), 2)
self.assertEqual(merged["first_seen"], "2026-01-01T00:00:00+00:00")
self.assertEqual(merged["last_seen"], "2026-01-02T00:00:00+00:00")
self.assertNotIn("_provenance", merged) # transient key stripped before saving

def test_occurrence_counts_distinct_conversations(self):
# Two proposals from the SAME conversation count as one occurrence.
out = _aggregate_proposals([self._p("ds.t.col", conv="c1"), self._p("ds.t.col", conv="c1")])
self.assertEqual(out[0]["occurrence_count"], 1)

def test_blank_name_distinct_instructions_not_collapsed(self):
out = _aggregate_proposals([
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog table A"),
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog table B"),
])
self.assertEqual(len(out), 2)

def test_blank_name_same_instruction_collapsed(self):
def test_blank_name_same_instruction_merged(self):
out = _aggregate_proposals([
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.4),
self._p("", atype="UNCATALOGED_ASSET", gap="UNCATALOGED_ASSET_DISCOVERY", instr="catalog A", conf=0.8),
])
self.assertEqual(len(out), 1)
self.assertEqual(out[0]["confidence_grade"], 0.8)
self.assertEqual(out[0]["max_instance_confidence"], 0.8)


class TestNoisyOr(unittest.TestCase):

def test_single_value_unchanged(self):
self.assertAlmostEqual(_noisy_or([0.7]), 0.7, places=4)

def test_recurrence_increases_confidence(self):
self.assertAlmostEqual(_noisy_or([0.6, 0.9]), 0.96, places=4)
self.assertGreater(_noisy_or([0.5, 0.5, 0.5]), 0.5)

def test_empty_is_zero(self):
self.assertEqual(_noisy_or([]), 0.0)

def test_none_treated_as_zero(self):
self.assertAlmostEqual(_noisy_or([None, 0.5]), 0.5, places=4)


# ---------------------------------------------------------------------------
Expand Down