Skip to content

FD-338: deep re-extraction (multi-sample consensus) backend#125

Open
patrickkidd wants to merge 1 commit into
masterfrom
deep-reextract
Open

FD-338: deep re-extraction (multi-sample consensus) backend#125
patrickkidd wants to merge 1 commit into
masterfrom
deep-reextract

Conversation

@patrickkidd
Copy link
Copy Markdown
Owner

FD-338 — backend

Async deep re-extraction: rebuild a fragmented Personal-app diagram into a more complete, better-connected tree via multi-sample consensus, delivered as a PDP delta the user reviews/accepts (improve-in-place — there is no diagram version history, so never a replace).

What it does

  • deep_reextract(discussion_or_diagram, k): K independent from-empty windowed extract_full accumulations → merged via f1_metrics.match_people (raw-name dedup forbidden per FD-324) → reconciled against the current committed diagram into a delta PDP (negative-ID adds for missing people, new pair-bonds bridging already-committed fragments, positive-ID parent-link edits).
  • Celery task (btcopilot/personal/tasks.py) mirroring the synthetic-generation pattern; registered in personal/__init__.py:init_celery.
  • Routes: POST /personal/discussions/<id>/deep-reextract (enqueue) + GET …/deep-reextract-status/<task_id> (state + progress + result). The status route gains an owner authorization check (previously unauthenticated).
  • Accumulate loop factored out and shared with connectivity_check (DRY).

Decisions

  • Default K=8. K=6 + match_people ceilings at the ≥90% LCC line; K=8 clears it (independent samples 90.5% / 90.7%, committed baseline 56%). K=4 is the cheaper opt-down. ~$0.61/rebuild — this knowingly supersedes the ticket's ~$0.46 cost figure. The match_people merge ceilings ~91% on this diagram; recovering the forbidden name-dedup's 94.9% is deferred R&D (the only durable way to get real margin).
  • Per-run resilience: transient extraction failures (timeouts, momentary invalid PDP) are retried and partial pools tolerated — one bad run no longer fails the whole rebuild.

Verification

  • Merge unit tests (mocked LLM): 6/6.
  • Full task round-trip on the real diagram: progress events fired, delta staged, version-checked write-back, prod restored byte-identical.
  • A live Celery worker registers deep_reextract.

Known gaps

  • F1 regression AC: unaffected by construction (extraction code untouched); local GT harness had no approved GT to run against.
  • App-interactive E2E (button → modal → worker → review sheet) not run here (no built app / Redis in this env) — to be tested in a built environment. Frontend in patrickkidd/familydiagram (companion PR).

🤖 Generated with Claude Code

Async deep_reextract: K independent from-empty windowed extractions merged
via f1_metrics.match_people into a PDP delta reconciled against the committed
diagram (improve-in-place, never replace — there is no diagram version history).
Celery task + enqueue/status routes; status route gains an owner authz check.

Default K=8 ('max fidelity'): K=6 ceilings at the >=90% LCC line, K=8 reliably
clears it (independent samples 90.5/90.7%); K=4 is the cheaper opt-down.
Per-run retry tolerates transient extraction failures so one bad run no longer
sinks a rebuild. Accumulate loop shared with connectivity_check (DRY).

Verified: merge unit tests (mocked LLM) 6/6; full task round-trip on the real
diagram (progress events, delta staged, version-checked write-back, prod
restored); a live worker registers the task.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces deep re-extraction functionality, allowing K independent accumulations to run in-memory and merge via consensus into a PDP delta. The changes include a new Celery task, API endpoints for triggering and checking task status, unit tests, and decision log updates. The review feedback highlights a few issues: defensive checks silently skipping committed items with missing IDs instead of failing early, mixing Person and PairBond ID mappings in a single dictionary, a potential TypeError when checking event IDs without a None check, and a potential TypeError when modifying projected people that might be objects instead of dictionaries.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +77 to +88
def _people_from_diagram(diagram_data: DiagramData) -> list[Person]:
return [
from_dict(Person, p) for p in diagram_data.people if p.get("id") is not None
]


def _bonds_from_diagram(diagram_data: DiagramData) -> list[PairBond]:
return [
from_dict(PairBond, pb)
for pb in diagram_data.pair_bonds
if pb.get("id") is not None
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The general rules specify that we must not add defensive checks to silently skip committed items that are missing an ID, and instead adhere to a fail-early policy where missing IDs on committed items are allowed to raise an error immediately. The current implementation of _people_from_diagram and _bonds_from_diagram silently filters out items with missing IDs using if p.get("id") is not None and if pb.get("id") is not None. We should raise a ValueError immediately if a committed item is missing an ID.

def _people_from_diagram(diagram_data: DiagramData) -> list[Person]:
    people = []
    for p in diagram_data.people:
        person = from_dict(Person, p)
        if person.id is None:
            raise ValueError("Committed person is missing an ID")
        people.append(person)
    return people


def _bonds_from_diagram(diagram_data: DiagramData) -> list[PairBond]:
    bonds = []
    for pb in diagram_data.pair_bonds:
        bond = from_dict(PairBond, pb)
        if bond.id is None:
            raise ValueError("Committed pair bond is missing an ID")
        bonds.append(bond)
    return bonds
References
  1. Do not add defensive checks to silently skip committed items that are missing an ID. Adhere to a fail-early policy where missing IDs on committed items are allowed to raise an error immediately.

Comment on lines +137 to +217
for run_dd in runs:
run_people = _people_from_diagram(run_dd)
run_bonds = _bonds_from_diagram(run_dd)

# Exclude default acc people from matching to prevent User/Assistant from
# absorbing real family members via the "User matches anything" rule.
matchable_acc = [p for p in acc_people if p.id not in default_acc_ids]

# match run people → accumulator people (non-default only)
_, id_map = match_people(run_people, matchable_acc, run_bonds, acc_bonds)

# Append unmatched run people as new negative-id entries
matched_run_ids = set(id_map.keys())
for rp in run_people:
if rp.id in matched_run_ids:
continue
neg = _next_neg(all_ids)
all_ids.add(neg)
new_p = copy.copy(rp)
new_p.id = neg
new_p.parents = None # will fix below after bond merging
acc_people.append(new_p)
id_map[rp.id] = neg

# Now handle pair bonds
# Build accumulator dyad → bond id index
acc_dyads: dict[tuple[int, int], int] = {}
for pb in acc_bonds:
dyad = _bond_dyad(pb)
if dyad is not None:
acc_dyads[dyad] = pb.id

# remap run bond endpoints via id_map
for rb in run_bonds:
mapped_a = (
id_map.get(rb.person_a, rb.person_a)
if rb.person_a is not None
else None
)
mapped_b = (
id_map.get(rb.person_b, rb.person_b)
if rb.person_b is not None
else None
)
if mapped_a is None or mapped_b is None:
continue
dyad = tuple(sorted([mapped_a, mapped_b]))
if dyad in acc_dyads:
# Bond already exists — record the bond id for use in id_map of bonds
id_map[rb.id] = acc_dyads[dyad]
continue
# New bond
neg = _next_neg(all_ids)
all_ids.add(neg)
new_pb = PairBond(id=neg, person_a=mapped_a, person_b=mapped_b)
acc_bonds.append(new_pb)
acc_dyads[dyad] = neg
id_map[rb.id] = neg

# Set parents on new accumulator people based on run's parent links
for rp in run_people:
acc_id = id_map.get(rp.id)
if acc_id is None or rp.parents is None:
continue
# Never set parents on default people (User/Assistant)
if acc_id in default_acc_ids:
continue
# Find the accumulator person
acc_p = next((p for p in acc_people if p.id == acc_id), None)
if acc_p is None:
continue
# Map run bond id → accumulator bond id
mapped_bond = id_map.get(rp.parents)
if mapped_bond is None:
continue
if acc_p.parents is None:
acc_p.parents = mapped_bond
# If the person was originally committed and now gets a parent link, record edit
if acc_id in original_committed_ids:
parent_edits[acc_id] = mapped_bond

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The general rules state: "When remapping entity IDs, ensure the remapping dictionary corresponds to the correct entity type. For instance, do not remap a PairBond ID using a Person ID map." Currently, id_map (which is returned by match_people as a person ID map) is being used to store both Person ID mappings and PairBond ID mappings, and is then used to remap rp.parents (a PairBond ID). We should separate these into person_id_map and bond_id_map to adhere to the architectural rules.

    for run_dd in runs:
        run_people = _people_from_diagram(run_dd)
        run_bonds = _bonds_from_diagram(run_dd)
        bond_id_map: dict[int, int] = {}

        # Exclude default acc people from matching to prevent User/Assistant from
        # absorbing real family members via the "User matches anything" rule.
        matchable_acc = [p for p in acc_people if p.id not in default_acc_ids]

        # match run people -> accumulator people (non-default only)
        _, person_id_map = match_people(run_people, matchable_acc, run_bonds, acc_bonds)

        # Append unmatched run people as new negative-id entries
        matched_run_ids = set(person_id_map.keys())
        for rp in run_people:
            if rp.id in matched_run_ids:
                continue
            neg = _next_neg(all_ids)
            all_ids.add(neg)
            new_p = copy.copy(rp)
            new_p.id = neg
            new_p.parents = None  # will fix below after bond merging
            acc_people.append(new_p)
            person_id_map[rp.id] = neg

        # Now handle pair bonds
        # Build accumulator dyad -> bond id index
        acc_dyads: dict[tuple[int, int], int] = {}
        for pb in acc_bonds:
            dyad = _bond_dyad(pb)
            if dyad is not None:
                acc_dyads[dyad] = pb.id

        # remap run bond endpoints via person_id_map
        for rb in run_bonds:
            mapped_a = (
                person_id_map.get(rb.person_a, rb.person_a)
                if rb.person_a is not None
                else None
            )
            mapped_b = (
                person_id_map.get(rb.person_b, rb.person_b)
                if rb.person_b is not None
                else None
            )
            if mapped_a is None or mapped_b is None:
                continue
            dyad = tuple(sorted([mapped_a, mapped_b]))
            if dyad in acc_dyads:
                # Bond already exists - record the bond id for use in bond_id_map
                bond_id_map[rb.id] = acc_dyads[dyad]
                continue
            # New bond
            neg = _next_neg(all_ids)
            all_ids.add(neg)
            new_pb = PairBond(id=neg, person_a=mapped_a, person_b=mapped_b)
            acc_bonds.append(new_pb)
            acc_dyads[dyad] = neg
            bond_id_map[rb.id] = neg

        # Set parents on new accumulator people based on run's parent links
        for rp in run_people:
            acc_id = person_id_map.get(rp.id)
            if acc_id is None or rp.parents is None:
                continue
            # Never set parents on default people (User/Assistant)
            if acc_id in default_acc_ids:
                continue
            # Find the accumulator person
            acc_p = next((p for p in acc_people if p.id == acc_id), None)
            if acc_p is None:
                continue
            # Map run bond id -> accumulator bond id
            mapped_bond = bond_id_map.get(rp.parents)
            if mapped_bond is None:
                continue
            if acc_p.parents is None:
                acc_p.parents = mapped_bond
                # If the person was originally committed and now gets a parent link, record edit
                if acc_id in original_committed_ids:
                    parent_edits[acc_id] = mapped_bond
References
  1. When remapping entity IDs, ensure the remapping dictionary corresponds to the correct entity type. For instance, do not remap a PairBond ID using a Person ID map.

Comment on lines +63 to +67
neg_ids = [p.id for p in ai_pdp.people if p.id is not None and p.id < 0]
neg_ids += [e.id for e in ai_pdp.events if e.id < 0]
neg_ids += [
pb.id for pb in ai_pdp.pair_bonds if pb.id is not None and pb.id < 0
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In accumulate_discussions, neg_ids is populated by checking p.id is not None and p.id < 0 for people, and pb.id is not None and pb.id < 0 for pair bonds. However, for events, it checks e.id < 0 without a prior None check. If an event's ID is None, this will raise a TypeError during comparison. We should add a None check for event IDs as well.

Suggested change
neg_ids = [p.id for p in ai_pdp.people if p.id is not None and p.id < 0]
neg_ids += [e.id for e in ai_pdp.events if e.id < 0]
neg_ids += [
pb.id for pb in ai_pdp.pair_bonds if pb.id is not None and pb.id < 0
]
neg_ids = [p.id for p in ai_pdp.people if p.id is not None and p.id < 0]
neg_ids += [e.id for e in ai_pdp.events if e.id is not None and e.id < 0]
neg_ids += [
pb.id for pb in ai_pdp.pair_bonds if pb.id is not None and pb.id < 0
]

Comment on lines +63 to +66
for ep in [p for p in delta_pdp.people if p.id is not None and p.id > 0]:
for p in projected.people:
if _person_id(p) == ep.id:
p["parents"] = ep.parents
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When setting parents on projected.people, the code assumes that each person p is a dictionary (p["parents"] = ep.parents). However, depending on the state of DiagramData, projected.people might contain Person objects instead of dictionaries. To prevent a potential TypeError, we should handle both dictionary and object types defensively.

        for ep in [p for p in delta_pdp.people if p.id is not None and p.id > 0]:
            for p in projected.people:
                if _person_id(p) == ep.id:
                    if isinstance(p, dict):
                        p["parents"] = ep.parents
                    else:
                        p.parents = ep.parents

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant