diff --git a/.claude/skills/audit-spark-commit/SKILL.md b/.claude/skills/audit-spark-commit/SKILL.md new file mode 100644 index 0000000000..cce891ef15 --- /dev/null +++ b/.claude/skills/audit-spark-commit/SKILL.md @@ -0,0 +1,128 @@ +--- +name: audit-spark-commit +description: Audit a single Apache Spark commit to determine whether it impacts DataFusion Comet. Reads the contributor guide for the rubric, fetches the commit, proposes a verdict, and updates dev/spark-commit-audit.md after the user reviews. +argument-hint: +--- + +Audit Apache Spark commit `$ARGUMENTS` for impact on DataFusion Comet. + +The full process and rubric live in +`docs/source/contributor-guide/spark_commit_audit.md`. Read that page first +so the rubric is in context. The steps below are a thin orchestration of +the per-commit workflow. + +## Inputs + +A single Spark commit hash, short or full. PR numbers are not accepted; the +caller must resolve them. + +## Steps + +### 1. Read the contributor guide + +Read `docs/source/contributor-guide/spark_commit_audit.md` start to finish. +The "Rubric" section is the source of truth for the verdict. The "Comet +scope reference" table tells you which subsystems Comet currently +implements. + +### 2. Fetch the Spark commit + +Ensure `apache/spark` is cloned to a cache dir: + +```bash +SPARK_DIR="/tmp/spark-audit-clone" +if [ ! -d "$SPARK_DIR" ]; then + git clone https://github.com/apache/spark.git "$SPARK_DIR" +else + git -C "$SPARK_DIR" fetch origin master +fi +``` + +Then read the commit: + +```bash +git -C "$SPARK_DIR" show --stat $ARGUMENTS +``` + +For deeper investigation, read the changed files directly with `git -C "$SPARK_DIR" show $ARGUMENTS:` or by checking out the commit in the cache dir. + +If a SPARK JIRA or GitHub PR is referenced in the commit message, fetch +that for additional context using `gh` if available. + +### 3. Confirm scope + +Confirm the commit touches `sql/` and is not entirely under `sql/connect/` +or `sql/hive-thriftserver/`. If it is out of scope, propose `not-relevant` +with a one-line note explaining why and proceed to step 5. + +### 4. Apply the rubric + +Walk the "Relevant" trigger list and the "Not relevant" bucket list from +the contributor guide. Cross-reference the affected subsystem against the +"Comet scope reference" table. + +Propose one of: + +- `relevant`: the commit affects a subsystem Comet emulates. +- `not-relevant`: the commit does not affect Comet. +- `unclear`: the rubric cannot determine impact without more research. + +Compose a one-sentence prose note that explains the verdict (e.g. "Adds a +new ANSI overflow check in `Add` expression that Comet currently does not +match"). Keep notes concise; the line is a single bullet. + +### 5. Update the audit log + +Locate the existing `[needs-triage]` line for this commit in +`dev/spark-commit-audit.md`. Match by short hash. + +If the line is missing, abort and tell the user: + +> The commit `$ARGUMENTS` is not in `dev/spark-commit-audit.md`. Re-run +> `python dev/regenerate-spark-audit.py` from the release virtualenv to +> pick up new commits, then invoke this skill again. + +Do not append the line; the bootstrap script is the single source of +truth for membership in the queue. + +If the line is present, propose the updated line to the user and wait for +approval or edits. The format is: + +``` +- `` [] . [. comet#] +``` + +On approval, replace the line in place using the Edit tool. Do not commit +and do not push. + +### 6. Offer to handle the Comet tracking issue + +If the verdict is `relevant`, ask the user: + +> This commit is `relevant`. How would you like to handle the Comet +> tracking issue? +> +> - **(a)** Draft the issue body to a local markdown file under +> `/tmp/comet-audit-issue-.md` for review. +> - **(b)** File a Comet GitHub issue immediately via `gh issue create`, +> after I show you the title and body. +> - **(c)** Skip; I will handle it later. + +For **(b)**, show the title and body and confirm before running `gh`. If +the user confirms, run `gh issue create --repo apache/datafusion-comet +--title "..." --body-file ` and report the resulting issue URL, +then offer to add `comet#NNNN` to the audit log line. + +## What this skill does NOT do + +- Resolve PR numbers to commits. +- Audit more than one commit per invocation. +- Append a new line if the commit is missing from the log (it tells the + user to re-run the bootstrap script instead). +- Commit, push, or open Comet PRs. + +## Tone and style + +- Keep prose notes to one sentence. +- Use backticks around code references. +- Avoid em dashes; use periods or restructure. diff --git a/dev/regenerate-spark-audit.py b/dev/regenerate-spark-audit.py new file mode 100644 index 0000000000..06ae35bcf5 --- /dev/null +++ b/dev/regenerate-spark-audit.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Regenerate dev/spark-commit-audit.md. + +Enumerates commits on Apache Spark master since branch-4.2 was cut that +touch the sql/ subtree (excluding sql/connect and sql/hive-thriftserver), +and writes them into the marker block of dev/spark-commit-audit.md. + +Idempotent: existing verdicts and prose notes are preserved by short hash. +""" + +from __future__ import annotations + +import argparse +import os +import re +import sys + + +BEGIN_MARKER = "" +END_MARKER = "" + +LINE_RE = re.compile(r"^- `([0-9a-f]{8})`") + + +def parse_existing_block(body: str) -> dict[str, str]: + """Return a map of short-hash to full line for every entry in the block.""" + if BEGIN_MARKER not in body or END_MARKER not in body: + raise ValueError( + f"audit log is missing one or both markers: {BEGIN_MARKER!r}, {END_MARKER!r}" + ) + start = body.index(BEGIN_MARKER) + len(BEGIN_MARKER) + end = body.index(END_MARKER) + block = body[start:end] + result: dict[str, str] = {} + for line in block.splitlines(): + line = line.rstrip() + m = LINE_RE.match(line) + if m: + result[m.group(1)] = line + return result + + +MAX_SUBJECT_LEN = 200 + + +def format_new_line(*, short_hash: str, date: str, subject: str) -> str: + """Format a fresh [needs-triage] entry.""" + if len(subject) > MAX_SUBJECT_LEN: + subject = subject[: MAX_SUBJECT_LEN - 3] + "..." + return f"- `{short_hash}` {date} [needs-triage] {subject}" + + +def is_in_scope(file_paths: list[str]) -> bool: + """True when the commit touches sql/ outside of connect/thriftserver.""" + for path in file_paths: + if not path.startswith("sql/"): + continue + if path.startswith("sql/connect/"): + continue + if path.startswith("sql/hive-thriftserver/"): + continue + return True + return False + + +def merge_lines(commits: list[dict], existing: dict[str, str]) -> list[str]: + """Merge a chronological commit list with existing audit lines. + + Existing entries are emitted verbatim. Commits not in the existing map + get a fresh [needs-triage] line. Output preserves the order of `commits`. + """ + result: list[str] = [] + for commit in commits: + short = commit["short"] + if short in existing: + result.append(existing[short]) + else: + result.append( + format_new_line( + short_hash=short, + date=commit["date"], + subject=commit["subject"], + ) + ) + return result + + +def replace_block(body: str, lines: list[str]) -> str: + """Return body with the marker block replaced by the given lines.""" + if BEGIN_MARKER not in body or END_MARKER not in body: + raise ValueError("audit log file is missing marker comments") + before, _, rest = body.partition(BEGIN_MARKER) + _, _, after = rest.partition(END_MARKER) + block_body = "\n".join(lines) + if block_body: + block_body = "\n" + block_body + "\n" + else: + block_body = "\n" + return before + BEGIN_MARKER + block_body + END_MARKER + after + + +def enumerate_spark_commits(token: str, limit: int | None = None) -> list[dict]: + """Enumerate in-scope Spark master commits since branch-4.2 was cut. + + Returns a list of {short, date, subject} dicts in chronological order. + """ + from github import Github # local import keeps the script importable without PyGithub installed + + gh = Github(token) + repo = gh.get_repo("apache/spark") + + # Resolve the branch-4.2 cut point as the merge base of master and branch-4.2. + print("Resolving branch-4.2 merge base...", file=sys.stderr) + cmp = repo.compare("branch-4.2", "master") + base_sha = cmp.merge_base_commit.sha + print(f"merge base: {base_sha}", file=sys.stderr) + + # Walk master commits that touch sql/, newest first, until we hit the merge base. + print("Listing sql/ commits on master...", file=sys.stderr) + paginated = repo.get_commits(sha="master", path="sql/") + + candidates: list = [] + seen = 0 + for c in paginated: + if c.sha == base_sha: + break + seen += 1 + if limit is not None and seen > limit: + break + candidates.append(c) + + print(f"fetched {len(candidates)} candidate commits", file=sys.stderr) + + # Filter by in-scope file paths. This is N extra API calls (one per commit). + out: list[dict] = [] + for i, c in enumerate(candidates): + if i % 50 == 0: + print(f"filtering {i}/{len(candidates)}...", file=sys.stderr) + files = [f.filename for f in c.files] + if not is_in_scope(files): + continue + date_str = c.commit.author.date.strftime("%Y-%m-%d") + subject = c.commit.message.split("\n", 1)[0] + out.append( + { + "short": c.sha[:8], + "date": date_str, + "subject": subject, + } + ) + + # Reverse to chronological order (oldest first). + out.reverse() + return out + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--dry-run", + action="store_true", + help="print the merged block to stdout instead of writing the file", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="only consider the most recent N candidate commits (for testing)", + ) + parser.add_argument( + "--audit-log", + default=os.path.join( + os.path.dirname(os.path.abspath(__file__)), "spark-commit-audit.md" + ), + help="path to the audit log file (default: dev/spark-commit-audit.md)", + ) + args = parser.parse_args() + + token = os.environ.get("GITHUB_TOKEN") + if not token: + print("GITHUB_TOKEN environment variable is required", file=sys.stderr) + return 2 + + with open(args.audit_log, "r", encoding="utf-8") as f: + body = f.read() + existing = parse_existing_block(body) + print(f"existing entries: {len(existing)}", file=sys.stderr) + + commits = enumerate_spark_commits(token, limit=args.limit) + print(f"in-scope commits: {len(commits)}", file=sys.stderr) + + merged = merge_lines(commits, existing) + new_body = replace_block(body, merged) + + if args.dry_run: + sys.stdout.write(new_body) + else: + with open(args.audit_log, "w", encoding="utf-8") as f: + f.write(new_body) + print(f"wrote {args.audit_log} ({len(merged)} entries)", file=sys.stderr) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/dev/spark-commit-audit.md b/dev/spark-commit-audit.md new file mode 100644 index 0000000000..52302802ee --- /dev/null +++ b/dev/spark-commit-audit.md @@ -0,0 +1,36 @@ + + +# Spark Commit Audit Log + +Tracks Apache Spark `master` commits since the `branch-4.2` cut that touch +the `sql/` subtree, recording whether each one impacts DataFusion Comet. +See `docs/source/contributor-guide/spark_commit_audit.md` for the rubric +and process. + +This file is regenerated and incrementally updated by +`dev/regenerate-spark-audit.py`. Existing verdicts and prose notes are +preserved on re-run. + +## Commits + + +- `84d9c842` 2026-05-02 [needs-triage] [SPARK-56686][FOLLOWUP][SQL] Mark CDC streaming rewrite via attribute metadata +- `ae5c075a` 2026-05-04 [needs-triage] [SPARK-56711][SQL] Restrict CDC `_commit_version` column to LongType or StringType + diff --git a/dev/test_regenerate_spark_audit.py b/dev/test_regenerate_spark_audit.py new file mode 100644 index 0000000000..1c129ca2e9 --- /dev/null +++ b/dev/test_regenerate_spark_audit.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import importlib.util + +_spec = importlib.util.spec_from_file_location( + "regenerate_spark_audit", + os.path.join(os.path.dirname(os.path.abspath(__file__)), "regenerate-spark-audit.py"), +) +rsa = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(rsa) + + +class ParseExistingTest(unittest.TestCase): + def test_parses_marker_block(self): + body = ( + "preamble\n" + "\n" + "- `abcd1234` 2026-01-15 [needs-triage] SPARK-1: Foo\n" + "- `def56789` 2026-01-16 [relevant] SPARK-2: Bar. comet#5023\n" + "\n" + "trailer\n" + ) + result = rsa.parse_existing_block(body) + self.assertEqual(set(result.keys()), {"abcd1234", "def56789"}) + self.assertIn("[needs-triage]", result["abcd1234"]) + self.assertIn("[relevant]", result["def56789"]) + self.assertIn("comet#5023", result["def56789"]) + + def test_handles_empty_block(self): + body = ( + "\n" + "\n" + ) + self.assertEqual(rsa.parse_existing_block(body), {}) + + def test_missing_markers_raises(self): + with self.assertRaises(ValueError): + rsa.parse_existing_block("no markers here") + + +class FormatNewLineTest(unittest.TestCase): + def test_basic(self): + line = rsa.format_new_line( + short_hash="abcd1234", + date="2026-01-15", + subject="SPARK-1: Add foo", + ) + self.assertEqual( + line, + "- `abcd1234` 2026-01-15 [needs-triage] SPARK-1: Add foo", + ) + + def test_truncates_long_subject(self): + long_subject = "A" * 300 + line = rsa.format_new_line( + short_hash="abcd1234", + date="2026-01-15", + subject=long_subject, + ) + self.assertLessEqual(len(line), 250) + self.assertTrue(line.endswith("...")) + + +class IsInScopeTest(unittest.TestCase): + def test_pure_sql_core(self): + self.assertTrue(rsa.is_in_scope(["sql/core/src/main/scala/Foo.scala"])) + + def test_pure_connect(self): + self.assertFalse(rsa.is_in_scope(["sql/connect/server/src/Bar.scala"])) + + def test_pure_thriftserver(self): + self.assertFalse(rsa.is_in_scope(["sql/hive-thriftserver/src/Baz.scala"])) + + def test_mixed_sql_and_connect(self): + self.assertTrue( + rsa.is_in_scope( + [ + "sql/connect/server/src/Bar.scala", + "sql/catalyst/src/main/scala/Qux.scala", + ] + ) + ) + + def test_no_sql_paths(self): + self.assertFalse(rsa.is_in_scope(["core/src/main/scala/Z.scala"])) + + def test_empty(self): + self.assertFalse(rsa.is_in_scope([])) + + +class MergeTest(unittest.TestCase): + def test_preserves_existing_and_appends_new(self): + existing = { + "abcd1234": "- `abcd1234` 2026-01-15 [relevant] SPARK-1: Foo. comet#5023", + } + commits = [ + {"short": "abcd1234", "date": "2026-01-15", "subject": "SPARK-1: Foo"}, + {"short": "def56789", "date": "2026-01-16", "subject": "SPARK-2: Bar"}, + ] + result = rsa.merge_lines(commits, existing) + self.assertEqual(len(result), 2) + self.assertEqual( + result[0], + "- `abcd1234` 2026-01-15 [relevant] SPARK-1: Foo. comet#5023", + ) + self.assertEqual( + result[1], + "- `def56789` 2026-01-16 [needs-triage] SPARK-2: Bar", + ) + + def test_chronological_order(self): + existing: dict[str, str] = {} + commits = [ + {"short": "11111111", "date": "2026-01-01", "subject": "X"}, + {"short": "22222222", "date": "2026-01-02", "subject": "Y"}, + {"short": "33333333", "date": "2026-01-03", "subject": "Z"}, + ] + result = rsa.merge_lines(commits, existing) + self.assertEqual(len(result), 3) + self.assertIn("11111111", result[0]) + self.assertIn("22222222", result[1]) + self.assertIn("33333333", result[2]) + + +class WriteBlockTest(unittest.TestCase): + def test_replaces_block_only(self): + original = ( + "preamble line\n" + "\n" + "- `abcd1234` 2026-01-15 [needs-triage] OLD\n" + "\n" + "trailer line\n" + ) + new_lines = [ + "- `abcd1234` 2026-01-15 [relevant] NEW. comet#9", + "- `def56789` 2026-01-16 [needs-triage] FRESH", + ] + result = rsa.replace_block(original, new_lines) + self.assertIn("preamble line", result) + self.assertIn("trailer line", result) + self.assertIn("[relevant] NEW. comet#9", result) + self.assertIn("[needs-triage] FRESH", result) + self.assertNotIn("OLD", result) + self.assertIn(rsa.BEGIN_MARKER, result) + self.assertIn(rsa.END_MARKER, result) + + def test_empty_lines_yields_empty_block(self): + original = ( + "\n" + "- `abcd1234` 2026-01-15 [needs-triage] OLD\n" + "\n" + ) + result = rsa.replace_block(original, []) + self.assertNotIn("OLD", result) + self.assertIn(rsa.BEGIN_MARKER, result) + self.assertIn(rsa.END_MARKER, result) + + +if __name__ == "__main__": + unittest.main() diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index 20e73c7428..927cbb80b6 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -40,6 +40,7 @@ Comet SQL Tests Spark SQL Tests Iceberg Spark Tests Bug Triage +Spark Commit Audit Roadmap Release Process Github and Issue Tracker diff --git a/docs/source/contributor-guide/spark_commit_audit.md b/docs/source/contributor-guide/spark_commit_audit.md new file mode 100644 index 0000000000..ac36dbfae9 --- /dev/null +++ b/docs/source/contributor-guide/spark_commit_audit.md @@ -0,0 +1,152 @@ + + +# Spark Commit Audit + +This page describes how the Comet community audits Apache Spark `master` +commits since `branch-4.2` was cut, so that the project stays aware of +upstream changes and does not silently diverge from Spark behavior. + +## Why we audit + +Comet emulates Spark behavior across many subsystems: expressions, the +optimizer, Parquet read and write, shuffle, joins, aggregates, and more. +When Spark changes behavior upstream, Comet may need to follow. The audit +is the mechanism that makes "did anyone notice this commit?" answerable. + +## Scope + +In scope: commits on Apache Spark `master` since the `branch-4.2` cut that +touch the `sql/` subtree. + +Out of scope: + +- `sql/connect/` (Spark Connect) +- `sql/hive-thriftserver/` +- Commits backported only to release branches. If a change is relevant it + lands on `master` first. +- Commits before the `branch-4.2` cut. + +## Where the log lives + +The audit log is `dev/spark-commit-audit.md`. Each line corresponds to one +Spark commit. The format is: + +`` - `<8-char-hash>` [] [. ][. comet#] `` + +Lines are kept oldest-first. New commits are appended by the bootstrap +script. + +### States + +| State | Meaning | +|---|---| +| `needs-triage` | Not yet audited. Initial state for every entry. | +| `relevant` | Audited; the commit affects Comet. A Comet issue link is recommended but not required. | +| `not-relevant` | Audited; the commit does not affect Comet. | +| `unclear` | Audited; the auditor could not decide. The prose note should explain why. | + +## Rubric + +### "Relevant" triggers + +Mark a commit `relevant` if any of the following apply: + +1. Adds, removes, or renames a Spark expression or function. +2. Changes evaluation behavior of an existing expression: null handling, + ANSI mode, overflow, dictionary encoding, or casting. +3. Changes the optimizer or planner in a way that produces a different + physical plan shape. +4. Changes Parquet reader or writer behavior, or pushdown semantics. +5. Changes shuffle or exchange behavior. +6. Changes operator behavior in joins, aggregates, window, or sort. +7. Changes type coercion, resolution, or analysis rules. +8. Changes ANSI mode defaults or semantics. +9. Adds a new SQL config that affects behavior Comet emulates. +10. Adds new Spark tests that exercise behavior Comet may not yet match. + +### "Not relevant" buckets + +Mark a commit `not-relevant` if it falls into one of these: + +- Spark Connect-only changes (already filtered at enumeration time). +- Hive thriftserver-only changes (already filtered). +- Codegen-only refactors. Comet does not use Spark's whole-stage codegen. +- Pure refactors with no behavior change. +- Docs, comments, build, or CI changes. +- Test-only changes that exercise behavior Comet already matches. + +### Comet scope reference + +Use the table below as a quick check for whether the affected subsystem is +one Comet currently cares about. See the +[Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html) +for the authoritative list. + +| Subsystem | Comet support | +|---|---| +| Expressions | Many supported, see compat guide | +| Parquet read | Supported | +| Parquet write | Partially supported | +| Native shuffle | Supported | +| JVM shuffle | Supported | +| Hash joins | Supported | +| Sort merge joins | Supported | +| Hash aggregate | Supported | +| Window | Partially supported | +| Sort | Supported | +| Scan pushdown | Partially supported | + +## Workflow + +1. Pull the latest `dev/spark-commit-audit.md` and grab a contiguous chunk + of `[needs-triage]` lines, typically 10 to 20 commits. +2. For each commit, read the Spark PR or commit, apply the rubric, and set + the state. Add an optional one-sentence prose note and, when relevant, + a `comet#NNNN` link. +3. When the verdict is `relevant` and there is no existing tracking issue, + filing one is recommended but not required. +4. Open a Comet PR titled `chore: audit Spark commits ..`. + +## Tools + +Claude users can run the `audit-spark-commit` skill on each commit hash +to get a proposed verdict that follows this rubric. Contributors who do +not use Claude follow the manual process above. + +## Bootstrapping and incremental updates + +The audit log is generated and maintained by `dev/regenerate-spark-audit.py`. +Run it from the existing release virtualenv: + +```sh +cd dev/release && source venv/bin/activate +export GITHUB_TOKEN= +python ../regenerate-spark-audit.py +``` + +The script is idempotent: it preserves existing verdicts and prose notes by +short-hash and only appends new `[needs-triage]` lines for commits that have +appeared since the last run. + +Useful flags: + +- `--dry-run`: print the resulting block without writing the file. +- `--limit N`: only consider the most recent N in-scope commits (for + testing).