Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 95 additions & 9 deletions askcc/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,94 @@ def fetch_github_issue(github_issue_url: str) -> str:
return "\n\n".join(sections)


def _find_linked_pr_number(gh: str, owner: str, repo: str, issue_number: int) -> int | None:
"""Find the most recent open PR linked to the given issue number.
CLOSE_KEYWORD_RE = re.compile(
r"\b(?:closes|closed|close|fixes|fixed|fix|resolves|resolved|resolve)\s+"
r"(?:(?P<qualified>[\w.-]+/[\w.-]+))?#(?P<num>\d+)(?!\d)",
re.IGNORECASE,
)


def _resolve_transferred_predecessors(gh: str, owner: str, repo: str, issue_number: int) -> list[tuple[str, str, int]]:
"""Return prior ``(owner, repo, number)`` coordinates of a transferred issue.

Queries the GitHub Timeline API for ``transferred`` events and extracts
each predecessor from ``source.issue``. GitHub currently exposes only the
immediately prior hop; the list shape is kept for forward compatibility.

All subprocess/JSON errors are logged and yield an empty list — never raised.
"""
repo_nwo = f"{owner}/{repo}"
try:
result = subprocess.run( # noqa: S603
[gh, "api", "--paginate", f"repos/{repo_nwo}/issues/{issue_number}/timeline"],
capture_output=True,
text=True,
check=True,
)
events = json.loads(result.stdout)
except (subprocess.CalledProcessError, json.JSONDecodeError) as exc:
logger.warning("Failed to fetch timeline for %s#%d: %s", repo_nwo, issue_number, exc)
return []

predecessors: list[tuple[str, str, int]] = []
for event in events:
if not isinstance(event, dict) or event.get("event") != "transferred":
continue
source_issue = (event.get("source") or {}).get("issue") or {}
prior_repo = source_issue.get("repository") or {}
prior_owner = (prior_repo.get("owner") or {}).get("login")
prior_repo_name = prior_repo.get("name")
prior_number = source_issue.get("number")
if prior_owner and prior_repo_name and prior_number is not None:
predecessors.append((prior_owner, prior_repo_name, int(prior_number)))
return predecessors


def _body_references_target(
body: str,
targets: set[tuple[str, str, int]],
current_owner: str,
current_repo: str,
) -> bool:
"""Return True if ``body`` positively references one of ``targets``.

Accepts close-keyword references (``Closes #N`` / ``Fixes owner/repo#N`` /
``Resolves …``) and word-bounded ``/issues/N`` URL references. Repo
qualifiers are compared case-insensitively.
"""
targets_lower = {(o.lower(), r.lower(), n) for o, r, n in targets}
target_numbers = {n for *_, n in targets}
current_key = (current_owner.lower(), current_repo.lower())

for match in CLOSE_KEYWORD_RE.finditer(body):
num = int(match.group("num"))
qualified = match.group("qualified")
if qualified is None:
if (*current_key, num) in targets_lower:
return True
else:
qowner, _, qrepo = qualified.partition("/")
if (qowner.lower(), qrepo.lower(), num) in targets_lower:
return True

return any(re.search(rf"/issues/{num}(?!\d)", body) for num in target_numbers)

Searches by branch naming convention (e.g. feature/N-*, add/N-*) first,
then falls back to matching issue references in PR body text.

def _find_linked_pr_number(gh: str, owner: str, repo: str, issue_number: int) -> int | None:
"""Find the most recent open PR positively linked to ``issue_number``.

Matching rules (in priority order):
1. PR branch name matches ``^[^/]+/{N}-`` for any ``N`` in
``{issue_number} ∪ predecessor_numbers``.
2. PR body contains a close keyword (Closes/Fixes/Resolves and
inflections) referencing the issue or a predecessor — bare ``#N``
(current repo) or qualified ``owner/repo#N``.
3. PR body contains ``/issues/N`` with a digit boundary for any target.

Predecessor coordinates come from ``_resolve_transferred_predecessors`` so
PRs created against a pre-transfer issue can still be paired after the
transfer. Returns ``None`` if no PR positively references the issue or any
predecessor — callers (``fetch_pr_content``) treat this as "no link".
"""
repo_nwo = f"{owner}/{repo}"
try:
Expand All @@ -113,16 +196,19 @@ def _find_linked_pr_number(gh: str, owner: str, repo: str, issue_number: int) ->
logger.warning("Failed to list PRs for %s: %s", repo_nwo, exc)
return None

# Match branch names like "type/N-description" (askcc convention)
branch_pattern = re.compile(rf"^[^/]+/{issue_number}-")
predecessors = _resolve_transferred_predecessors(gh, owner, repo, issue_number)
targets: set[tuple[str, str, int]] = {(owner, repo, issue_number), *predecessors}
target_numbers = {n for *_, n in targets}

branch_patterns = [re.compile(rf"^[^/]+/{n}-") for n in target_numbers]
for pr in prs:
if branch_pattern.match(pr.get("headRefName", "")):
head = pr.get("headRefName", "") or ""
if any(p.match(head) for p in branch_patterns):
return pr["number"]

# Fallback: check PR body for issue reference
for pr in prs:
body = pr.get("body", "") or ""
if f"#{issue_number}" in body or f"/issues/{issue_number}" in body:
if _body_references_target(body, targets, owner, repo):
return pr["number"]

return None
Expand Down
18 changes: 16 additions & 2 deletions askcc/skills/handle-github-issue/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,27 @@ Update test plan in PR description:

Before adopting the persona, fetch the linked PR:

1. List PRs and find the one whose `headRefName` matches `^[a-z]+/<issue-number>-`, else any PR whose body references `#<N>` or `/issues/<N>`:
1. Compute predecessor coordinates for transferred issues. Query the Timeline API for `transferred` events and collect each `source.issue.repository.owner.login`, `source.issue.repository.name`, and `source.issue.number`:

```bash
gh api --paginate repos/<owner>/<repo>/issues/<N>/timeline
```

If the call fails, treat the predecessor list as empty and continue. Let `target_numbers = {<N>, prior_number_1, prior_number_2, …}` and `target_refs = {<owner>/<repo>#<N>, <prior_owner>/<prior_repo>#<prior_number>, …}`.

2. List open PRs and find one with a positive match (in this priority order). Word-boundary matching is required — a bare `#5` must not match `#52`.

```bash
gh pr list -R <owner>/<repo> --json number,headRefName,body --limit 50
```

2. Fetch metadata, diff, and review comments:
- **Branch convention**: `headRefName` matches `^[^/]+/<num>-` for any `num` in `target_numbers`.
- **Close keyword**: PR body contains `Closes`, `Closed`, `Close`, `Fixes`, `Fixed`, `Fix`, `Resolves`, `Resolved`, or `Resolve` (case-insensitive) followed by either bare `#<num>` (current repo) or qualified `<qowner>/<qrepo>#<num>`, where the `(owner, repo, num)` triple is in `target_refs`. Digit boundary on `num` is mandatory.
- **Issue URL fallback**: PR body contains `/issues/<num>` (no trailing digit) for any `num` in `target_numbers`.

If no PR satisfies one of these positive matches, **abort the action without posting** — do not run `pr-review` against a PR that does not explicitly reference this issue or one of its predecessors. This prevents the substring-match mispair (e.g. `#5` accidentally matching a PR whose body references `#52`).

3. Fetch metadata, diff, and review comments:

```bash
gh api repos/<owner>/<repo>/pulls/<pr-number>
Expand Down
121 changes: 120 additions & 1 deletion tests/test_askcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
_has_acceptance_criteria,
_has_dependencies_section,
_parse_issue_url,
_resolve_transferred_predecessors,
_run_project_verification,
_swap_issue_labels,
_transition_project_fields,
Expand Down Expand Up @@ -1693,6 +1694,121 @@ def test_null_body_handled(self):
with patch("askcc.functions.subprocess.run", return_value=result):
assert _find_linked_pr_number("/usr/bin/gh", "owner", "repo", 42) == 3

def test_substring_collision_returns_none(self):
"""Regression: body 'Closes weyucou/task-management#52' must not pair with issue_number=5."""
prs = json.dumps(
[{"number": 6, "headRefName": "chore/52-gitignore", "body": "Closes weyucou/task-management#52"}]
)
result = subprocess.CompletedProcess(args=[], returncode=0, stdout=prs, stderr="")
with patch("askcc.functions.subprocess.run", return_value=result):
assert _find_linked_pr_number("/usr/bin/gh", "weyucou", "issue-tracker", 5) is None

def test_word_boundary_rejects_substring_in_bare_reference(self):
"""Bare '#51' / '#500' must not be paired with issue_number=5."""
prs = json.dumps([{"number": 9, "headRefName": "main", "body": "See #51, #500"}])
result = subprocess.CompletedProcess(args=[], returncode=0, stdout=prs, stderr="")
with patch("askcc.functions.subprocess.run", return_value=result):
assert _find_linked_pr_number("/usr/bin/gh", "owner", "repo", 5) is None

def test_word_boundary_positive_bare_reference(self):
"""Bare '#5' (with close keyword) pairs cleanly with issue_number=5."""
prs = json.dumps([{"number": 4, "headRefName": "main", "body": "Fixes #5"}])
result = subprocess.CompletedProcess(args=[], returncode=0, stdout=prs, stderr="")
with patch("askcc.functions.subprocess.run", return_value=result):
assert _find_linked_pr_number("/usr/bin/gh", "owner", "repo", 5) == 4

def test_transfer_aware_branch_match(self):
"""Branch matching the predecessor number wins after issue transfer."""
pr_list = json.dumps([{"number": 2, "headRefName": "chore/25-yaml-event-schema-v1", "body": "unrelated"}])
timeline = json.dumps(
[
{
"event": "transferred",
"source": {
"issue": {
"number": 25,
"repository": {"name": "task-management", "owner": {"login": "weyucou"}},
}
},
}
]
)
pr_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=pr_list, stderr="")
timeline_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=timeline, stderr="")
with patch("askcc.functions.subprocess.run", side_effect=[pr_result, timeline_result]):
assert _find_linked_pr_number("/usr/bin/gh", "weyucou", "issue-tracker", 5) == 2

def test_transfer_aware_body_match(self):
"""Body 'Closes weyucou/task-management#25' pairs with transferred issue#5."""
pr_list = json.dumps(
[{"number": 2, "headRefName": "no-conventional-branch", "body": "Closes weyucou/task-management#25"}]
)
timeline = json.dumps(
[
{
"event": "transferred",
"source": {
"issue": {
"number": 25,
"repository": {"name": "task-management", "owner": {"login": "weyucou"}},
}
},
}
]
)
pr_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=pr_list, stderr="")
timeline_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=timeline, stderr="")
with patch("askcc.functions.subprocess.run", side_effect=[pr_result, timeline_result]):
assert _find_linked_pr_number("/usr/bin/gh", "weyucou", "issue-tracker", 5) == 2


class TestResolveTransferredPredecessors:
def test_parses_transferred_event(self):
timeline = json.dumps(
[
{"event": "labeled"},
{
"event": "transferred",
"source": {
"issue": {
"number": 25,
"repository": {"name": "task-management", "owner": {"login": "weyucou"}},
}
},
},
]
)
result = subprocess.CompletedProcess(args=[], returncode=0, stdout=timeline, stderr="")
with patch("askcc.functions.subprocess.run", return_value=result):
predecessors = _resolve_transferred_predecessors("/usr/bin/gh", "weyucou", "issue-tracker", 5)
assert predecessors == [("weyucou", "task-management", 25)]

def test_clean_timeline_returns_empty(self):
timeline = json.dumps([{"event": "labeled"}, {"event": "commented"}])
result = subprocess.CompletedProcess(args=[], returncode=0, stdout=timeline, stderr="")
with patch("askcc.functions.subprocess.run", return_value=result):
assert _resolve_transferred_predecessors("/usr/bin/gh", "weyucou", "issue-tracker", 5) == []

def test_subprocess_error_returns_empty(self, caplog: pytest.LogCaptureFixture):
with (
caplog.at_level("WARNING", logger="askcc.functions"),
patch(
"askcc.functions.subprocess.run",
side_effect=subprocess.CalledProcessError(1, "gh", stderr="api error"),
),
):
assert _resolve_transferred_predecessors("/usr/bin/gh", "weyucou", "issue-tracker", 5) == []
assert "Failed to fetch timeline" in caplog.text

def test_bad_json_returns_empty(self, caplog: pytest.LogCaptureFixture):
result = subprocess.CompletedProcess(args=[], returncode=0, stdout="not-json", stderr="")
with (
caplog.at_level("WARNING", logger="askcc.functions"),
patch("askcc.functions.subprocess.run", return_value=result),
):
assert _resolve_transferred_predecessors("/usr/bin/gh", "weyucou", "issue-tracker", 5) == []
assert "Failed to fetch timeline" in caplog.text


class TestFetchPrContent:
ISSUE_URL = "https://github.com/monkut/askcc-cli/issues/42"
Expand All @@ -1711,6 +1827,9 @@ def _make_pr_responses(
prs = json.dumps([{"number": pr_number, "headRefName": "feature/42-add", "body": pr_body}])
list_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=prs, stderr="")

# Timeline lookup (for _resolve_transferred_predecessors) — empty events
timeline_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="[]", stderr="")

# PR metadata
pr_data = json.dumps({"title": pr_title, "body": pr_body, "html_url": pr_url})
meta_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=pr_data, stderr="")
Expand All @@ -1722,7 +1841,7 @@ def _make_pr_responses(
comments = json.dumps(review_comments or [])
comments_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=comments, stderr="")

return [list_result, meta_result, diff_result, comments_result]
return [list_result, timeline_result, meta_result, diff_result, comments_result]

def test_fetches_pr_content(self):
responses = self._make_pr_responses()
Expand Down
Loading