Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 154 additions & 9 deletions agent_factory/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,20 @@ def _linear_graphql(api_key: str, query: str, variables: dict | None = None) ->


def fetch_unestimated_issues(api_key: str, team_id: str, limit: int = 10) -> list[dict]:
"""Fetch issues that have no estimate set."""
"""Fetch issues that have no estimate AND have not been scored by us yet.

The exclusion criteria is the `ai-estimated` label which we set immediately
after posting our suggestion. Without it, this query would keep returning
the same issues every poll cycle and we'd burn API credits in a loop.
"""
query = f"""
query {{
issues(
filter: {{
team: {{ id: {{ eq: "{team_id}" }} }}
estimate: {{ null: true }}
state: {{ type: {{ nin: ["completed", "canceled"] }} }}
labels: {{ every: {{ name: {{ neq: "ai-estimated" }} }} }}
}}
orderBy: createdAt
first: {limit}
Expand All @@ -119,6 +125,7 @@ def fetch_unestimated_issues(api_key: str, team_id: str, limit: int = 10) -> lis
description
priority
project {{ name description }}
parent {{ identifier title description }}
labels {{ nodes {{ name }} }}
state {{ name }}
createdAt
Expand All @@ -130,30 +137,107 @@ def fetch_unestimated_issues(api_key: str, team_id: str, limit: int = 10) -> lis
return data.get("issues", {}).get("nodes", [])


def estimate_issue(issue: dict, config: Config, calibration_data: dict | None = None) -> EstimationResult:
"""Use Claude to estimate a single issue."""
def fetch_similar_completed_issues(api_key: str, team_id: str, area_labels: list[str], limit: int = 5) -> list[dict]:
"""Fetch up to N recently completed issues with matching area labels.

Used to ground the estimate in real historical cycle times for similar work.
"""
if not area_labels:
return []
# Linear GraphQL doesn't trivially do OR on labels via filter syntax in a one-shot,
# so fetch by team + state=completed and let the caller filter by overlap.
query = f"""
query {{
issues(
filter: {{
team: {{ id: {{ eq: "{team_id}" }} }}
state: {{ type: {{ eq: "completed" }} }}
estimate: {{ null: false }}
}}
orderBy: updatedAt
first: 30
) {{
nodes {{
identifier title estimate
labels {{ nodes {{ name }} }}
startedAt completedAt
}}
}}
}}
"""
data = _linear_graphql(api_key, query)
nodes = data.get("issues", {}).get("nodes", [])
# Score overlap with the current issue's labels
area_set = set(l.lower() for l in area_labels)
scored = []
for n in nodes:
labels = [l["name"].lower() for l in n.get("labels", {}).get("nodes", [])]
overlap = len(area_set & set(labels))
if overlap > 0:
scored.append((overlap, n))
scored.sort(key=lambda x: -x[0])
return [n for _, n in scored[:limit]]


def estimate_issue(issue: dict, config: Config, calibration_data: dict | None = None, similar_issues: list[dict] | None = None) -> EstimationResult:
"""Use Claude to estimate a single issue.

Context layers (in order of priority):
1. The issue itself: title, description, priority, labels, state
2. Parent epic: name, description (so the estimator knows what business value
this work serves and what the broader scope is)
3. Project / initiative context (name + description)
4. Similar past completed issues with the same area labels (real cycle times,
not just a global average)
5. Global historical baseline (calibration_data)
"""
project_context = ""
if issue.get("project"):
proj = issue["project"]
project_context = f"\nProject (Initiative): {proj.get('name', 'Unknown')}"
project_context = f"\nInitiative: {proj.get('name', 'Unknown')}"
if proj.get("description"):
project_context += f"\nProject description: {proj['description']}"
project_context += f"\nInitiative description: {proj['description'][:400]}"

parent_context = ""
if issue.get("parent"):
p = issue["parent"]
parent_context = f"\nParent epic ({p.get('identifier', '?')}): {p.get('title', '')}"
if p.get("description"):
parent_context += f"\nParent epic description: {p['description'][:500]}"

labels = [l["name"] for l in issue.get("labels", {}).get("nodes", [])]
label_str = ", ".join(labels) if labels else "none"

calibration_hint = ""
if calibration_data:
avg_cycle = calibration_data.get("avg_cycle_time_days", 10)
calibration_hint = f"\nHistorical context: team average cycle time is ~{avg_cycle} days per task."
calibration_hint = f"\nHistorical baseline: team average cycle time is ~{avg_cycle} days per task (across {calibration_data.get('sample_size', 0)} past tickets)."

similar_block = ""
if similar_issues:
lines = []
for s in similar_issues[:5]:
actual_days = None
if s.get("startedAt") and s.get("completedAt"):
from datetime import datetime
try:
start = datetime.fromisoformat(s["startedAt"].replace("Z", "+00:00"))
end = datetime.fromisoformat(s["completedAt"].replace("Z", "+00:00"))
actual_days = round((end - start).total_seconds() / 86400, 1)
except Exception:
pass
est_label = {1: "XS", 2: "S", 3: "M", 5: "L", 8: "XL"}.get(s.get("estimate", 0), str(s.get("estimate", "?")))
actual_str = f"{actual_days}d actual" if actual_days is not None else "no end date"
lines.append(f"- {s.get('identifier','?')}: '{s.get('title','')[:60]}' estimated {est_label}, {actual_str}")
similar_block = "\nSimilar past completed work (same labels):\n" + "\n".join(lines)

user_message = f"""Estimate this Linear issue:

**Title:** {issue.get('title', 'Untitled')}
**Description:** {issue.get('description') or 'No description provided.'}
**Priority:** {issue.get('priority', 'Unknown')}
**Labels:** {label_str}
**Status:** {issue.get('state', {}).get('name', 'Unknown')}{project_context}{calibration_hint}
**Status:** {issue.get('state', {}).get('name', 'Unknown')}{project_context}{parent_context}{calibration_hint}{similar_block}
"""

estimation_model = os.environ.get("ESTIMATION_MODEL", "claude-haiku-4-5-20251001")
Expand Down Expand Up @@ -222,6 +306,52 @@ def post_estimation_comment(api_key: str, issue_id: str, result: EstimationResul
_linear_graphql(api_key, mutation, {"issueId": issue_id, "body": comment})


def mark_issue_as_estimated(api_key: str, issue_id: str, team_id: str) -> None:
"""Attach the `ai-estimated` label so the polling query skips this issue.

Without this the estimator re-scores the same issue on every poll cycle.
"""
# Resolve the label ID once
label_q = f"""
{{ issueLabels(filter: {{ name: {{ eq: "ai-estimated" }}, team: {{ id: {{ eq: "{team_id}" }} }} }}) {{ nodes {{ id }} }} }}
"""
data = _linear_graphql(api_key, label_q)
nodes = data.get("issueLabels", {}).get("nodes", [])
if not nodes:
# Auto-create if missing
create = """
mutation Create($input: IssueLabelCreateInput!) {
issueLabelCreate(input: $input) { success issueLabel { id } }
}
"""
created = _linear_graphql(api_key, create, {
"input": {"name": "ai-estimated", "color": "#22D3EE", "teamId": team_id}
})
label_id = created.get("issueLabelCreate", {}).get("issueLabel", {}).get("id")
else:
label_id = nodes[0]["id"]
if not label_id:
return
mutation = """
mutation AddLabel($issueId: String!, $labelIds: [String!]!) {
issueAddLabel(id: $issueId, labelId: $labelIds) { success }
}
"""
# Linear v2 mutation: issueUpdate with labelIds (additive via labelIds union)
# Simpler: just call issueUpdate with the label added.
issue_q = f'{{ issue(id: "{issue_id}") {{ labels {{ nodes {{ id }} }} }} }}'
issue_data = _linear_graphql(api_key, issue_q)
existing = [n["id"] for n in issue_data.get("issue", {}).get("labels", {}).get("nodes", [])]
if label_id not in existing:
existing.append(label_id)
update_mut = """
mutation Update($id: String!, $input: IssueUpdateInput!) {
issueUpdate(id: $id, input: $input) { success }
}
"""
_linear_graphql(api_key, update_mut, {"id": issue_id, "input": {"labelIds": existing}})


def load_calibration_data() -> dict | None:
"""Load historical calibration data if available."""
candidates = [
Expand Down Expand Up @@ -287,10 +417,25 @@ def run_estimation_loop(once: bool = False, interval: int = 120) -> None:
print(f"[estimator] Scoring {ident}: {title}...")

try:
result = estimate_issue(issue, config, calibration)
# Grab a few similar past completed tickets (same labels) so the
# estimate is grounded in real history, not just a global baseline.
area_labels = [l["name"] for l in issue.get("labels", {}).get("nodes", [])
if l["name"] not in ("ai-estimated", "agent-ready", "ai-proposed", "needs-decomposition", "needs-refinement")]
similar = []
try:
similar = fetch_similar_completed_issues(api_key, team_id, area_labels)
except Exception:
pass

result = estimate_issue(issue, config, calibration, similar_issues=similar)
post_estimation_comment(api_key, issue["id"], result)
# Mark with `ai-estimated` so we don't re-process this every poll.
try:
mark_issue_as_estimated(api_key, issue["id"], team_id)
except Exception as label_err:
print(f"[estimator] warning: could not add ai-estimated label to {ident}: {label_err}")
print(f"[estimator] -> {result.tshirt_suggestion} "
f"(impact={result.impact_score}, conf={result.confidence:.1f})")
f"(impact={result.impact_score}, conf={result.confidence:.1f}, similar={len(similar)})")
except Exception as e:
print(f"[estimator] ERROR on {ident}: {e}")

Expand Down