Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion backend/internal/domain/review.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package domain

import "time"
import (
"errors"
"time"
)

// ErrDuplicateReviewRun is returned by InsertReviewRun when a run already exists
// for the same worker session and target commit (the partial unique index from
// migration 0013). It lets the review engine fall back to the recorded run
// instead of surfacing a raw storage error after a reviewer may have launched.
var ErrDuplicateReviewRun = errors.New("domain: review run already exists for session and target sha")

// Review is the per-worker code-review record: one row per worker session
// (SessionID is unique). A repeat trigger reuses this row; the per-pass facts
Expand Down
62 changes: 55 additions & 7 deletions backend/internal/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -74,6 +75,12 @@ type Engine struct {
launcher Launcher
clock func() time.Time
newID func() string

// triggerMu guards triggerLocks; triggerLocks holds one mutex per worker
// session so concurrent Trigger calls for the same worker serialise (see
// lockWorker). Distinct workers never contend.
triggerMu sync.Mutex
triggerLocks map[domain.SessionID]*sync.Mutex
}

// New wires an Engine from its dependencies, defaulting the clock and id source.
Expand All @@ -87,14 +94,35 @@ func New(d Deps) *Engine {
newID = uuid.NewString
}
return &Engine{
store: d.Store,
sessions: d.Sessions,
prs: d.PRs,
projects: d.Projects,
launcher: d.Launcher,
clock: clock,
newID: newID,
store: d.Store,
sessions: d.Sessions,
prs: d.PRs,
projects: d.Projects,
launcher: d.Launcher,
clock: clock,
newID: newID,
triggerLocks: make(map[domain.SessionID]*sync.Mutex),
}
}

// lockWorker serialises Trigger calls for a single worker session and returns
// the unlock func. Without it, two concurrent triggers for the same worker can
// both pass the per-commit idempotency check and each spawn a reviewer against
// the same deterministic handle, leaving two running runs for one commit (#242).
//
// The per-worker mutex is created on first use and kept for the lifetime of the
// engine; the entry is a single pointer, so the unbounded-by-session-count map
// is a negligible, bounded-in-practice cost.
func (e *Engine) lockWorker(id domain.SessionID) func() {
e.triggerMu.Lock()
mu, ok := e.triggerLocks[id]
if !ok {
mu = &sync.Mutex{}
e.triggerLocks[id] = mu
}
e.triggerMu.Unlock()
mu.Lock()
return mu.Unlock
}

// TriggerResult is the outcome of a trigger: the (new or existing) run, the live
Expand Down Expand Up @@ -122,6 +150,14 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
if workerID == "" {
return TriggerResult{}, fmt.Errorf("%w: worker session id is required", ErrInvalid)
}

// Serialise concurrent triggers for this worker so the idempotency check
// below (and the reviewer spawn that follows it) can't be raced into a
// double-spawn. Held across the spawn deliberately: the loser then re-reads
// the freshly-recorded run and short-circuits to Created:false.
unlock := e.lockWorker(workerID)
defer unlock()

worker, ok, err := e.sessions.GetSession(ctx, workerID)
if err != nil {
return TriggerResult{}, err
Expand Down Expand Up @@ -209,6 +245,18 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
CreatedAt: now,
}
if err := e.store.InsertReviewRun(ctx, run); err != nil {
// The per-worker lock serialises in-process triggers, but the unique
// index (migration 0013) can still reject a run a concurrent daemon (or
// a pre-lock restart) recorded for this commit. The reviewer is already
// launched, so don't surface a raw error: re-read the recorded run and
// return it as the existing, not-newly-created pass.
if errors.Is(err, domain.ErrDuplicateReviewRun) {
if existing, ok, getErr := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); getErr != nil {
return TriggerResult{}, getErr
} else if ok {
return TriggerResult{Run: existing, ReviewerHandleID: handleID, Created: false}, nil
}
}
return TriggerResult{}, err
}
return TriggerResult{Run: run, ReviewerHandleID: handleID, Created: true}, nil
Expand Down
91 changes: 83 additions & 8 deletions backend/internal/review/review_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package review
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand All @@ -14,6 +15,11 @@ import (
type fakeStore struct {
review *domain.Review
runs []domain.ReviewRun
// insertErr, when set, makes the next InsertReviewRun model a concurrent
// writer that already recorded a run for this commit: it records that
// winner (so a follow-up GetReviewRunBySessionAndSHA finds it) and returns
// insertErr instead of recording the caller's run.
insertErr error
}

func (f *fakeStore) UpsertReview(_ context.Context, r domain.Review) error {
Expand All @@ -28,6 +34,12 @@ func (f *fakeStore) GetReviewBySession(_ context.Context, _ domain.SessionID) (d
return *f.review, true, nil
}
func (f *fakeStore) InsertReviewRun(_ context.Context, r domain.ReviewRun) error {
if f.insertErr != nil {
winner := r
winner.ID = "winner-" + r.ID
f.runs = append(f.runs, winner)
return f.insertErr
}
f.runs = append(f.runs, r)
return nil
}
Expand Down Expand Up @@ -87,18 +99,20 @@ func (f fakeProjects) GetProject(_ context.Context, id string) (domain.ProjectRe
}

type fakeLauncher struct {
handle string
alive bool
spawnErr error
notifyErr error
spawned bool
notified bool
gotSpec LaunchSpec
gotHandle string
handle string
alive bool
spawnErr error
notifyErr error
spawned bool
spawnCount int
notified bool
gotSpec LaunchSpec
gotHandle string
}

func (f *fakeLauncher) Spawn(_ context.Context, spec LaunchSpec) (string, error) {
f.spawned = true
f.spawnCount++
f.gotSpec = spec
if f.spawnErr != nil {
return "", f.spawnErr
Expand Down Expand Up @@ -163,6 +177,67 @@ func TestTriggerSpawnsNewReviewerAndRecordsRunAfterLaunch(t *testing.T) {
}
}

func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) {
store := &fakeStore{}
launcher := &fakeLauncher{handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

const n = 8
var wg sync.WaitGroup
results := make([]TriggerResult, n)
errs := make([]error, n)
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
results[i], errs[i] = eng.Trigger(context.Background(), "mer-1")
}(i)
}
wg.Wait()

created := 0
for i := 0; i < n; i++ {
if errs[i] != nil {
t.Fatalf("Trigger[%d]: %v", i, errs[i])
}
if results[i].Created {
created++
}
}
// Exactly one trigger does the work; the rest reuse its run.
if created != 1 {
t.Errorf("Created=true count = %d, want exactly 1", created)
}
if launcher.spawnCount != 1 {
t.Errorf("reviewer spawn count = %d, want 1", launcher.spawnCount)
}
if len(store.runs) != 1 {
t.Errorf("recorded review runs = %d, want 1", len(store.runs))
}
}

func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) {
// The idempotency check passes (no run yet), the reviewer launches, but the
// insert loses to a concurrent writer the unique index already accepted.
store := &fakeStore{insertErr: domain.ErrDuplicateReviewRun}
launcher := &fakeLauncher{handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
if err != nil {
t.Fatalf("Trigger: %v", err)
}
if res.Created {
t.Fatalf("expected Created=false on unique conflict: %+v", res)
}
if res.Run.TargetSHA != "sha1" || res.Run.ID != "winner-id-1" {
t.Fatalf("expected the recorded winner run, got %+v", res.Run)
}
if launcher.spawnCount != 1 {
t.Fatalf("reviewer should still have launched once: %+v", launcher)
}
}

func TestTriggerIsIdempotentForSameCommit(t *testing.T) {
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
Expand Down
102 changes: 102 additions & 0 deletions backend/internal/storage/sqlite/migrate_review_dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package sqlite

import (
"database/sql"
"path/filepath"
"testing"

"github.com/pressly/goose/v3"
)

// upTo migrates the db to a specific goose version, sharing migrate()'s goose
// global setup under gooseMu.
func upTo(t *testing.T, db *sql.DB, version int64) {
t.Helper()
gooseMu.Lock()
defer gooseMu.Unlock()
goose.SetBaseFS(migrationsFS)
goose.SetLogger(goose.NopLogger())
if err := goose.SetDialect("sqlite3"); err != nil {
t.Fatalf("set dialect: %v", err)
}
if err := goose.UpTo(db, "migrations", version); err != nil {
t.Fatalf("migrate to %d: %v", version, err)
}
}

// TestMigration0013DedupesExistingDuplicates guards the data-safety concern in
// #246: a pre-#242 daemon could already hold duplicate (session_id, target_sha)
// review_run rows, on which CREATE UNIQUE INDEX would fail and wedge startup. The
// migration must collapse each group to one survivor first. We open without the
// foreign_keys pragma so review_run rows can be seeded without the full
// project/session/review parent chain — the dedup is pure data movement.
func TestMigration0013DedupesExistingDuplicates(t *testing.T) {
db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "ao.db")+"?_pragma=busy_timeout(5000)")
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
t.Cleanup(func() { _ = db.Close() })

// Stop just before 0013: review tables exist, the unique index does not.
upTo(t, db, 12)

// One duplicate group on shaA (a stale run, a completed pass carrying the
// verdict, and a newer still-running pass), plus a distinct sha and two
// empty-sha rows that the partial index excludes and must all survive.
seed := []struct{ id, sha, status, createdAt string }{
{"r-old", "shaA", "running", "2026-06-01T00:00:00Z"},
{"r-complete", "shaA", "complete", "2026-06-02T00:00:00Z"},
{"r-new-running", "shaA", "running", "2026-06-03T00:00:00Z"},
{"r-other-sha", "shaB", "running", "2026-06-01T00:00:00Z"},
{"r-empty-1", "", "running", "2026-06-01T00:00:00Z"},
{"r-empty-2", "", "running", "2026-06-02T00:00:00Z"},
}
for _, r := range seed {
if _, err := db.Exec(
`INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at)
VALUES (?, 'rev-1', 's1', 'claude-code', '', ?, ?, '', '', ?)`,
r.id, r.sha, r.status, r.createdAt,
); err != nil {
t.Fatalf("seed %s: %v", r.id, err)
}
}

// Applying 0013 dedupes, then builds the unique index.
upTo(t, db, 13)

survivors := map[string]bool{}
rows, err := db.Query(`SELECT id FROM review_run`)
if err != nil {
t.Fatalf("query survivors: %v", err)
}
defer rows.Close()
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
t.Fatalf("scan: %v", err)
}
survivors[id] = true
}
if err := rows.Err(); err != nil {
t.Fatalf("rows: %v", err)
}

// shaA collapses to the completed pass; everything else is untouched.
want := []string{"r-complete", "r-other-sha", "r-empty-1", "r-empty-2"}
if len(survivors) != len(want) {
t.Fatalf("survivors = %v, want exactly %v", survivors, want)
}
for _, id := range want {
if !survivors[id] {
t.Errorf("expected %q to survive the dedup", id)
}
}

// The index is live and now rejects a fresh duplicate.
if _, err := db.Exec(
`INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at)
VALUES ('dup', 'rev-1', 's1', 'claude-code', '', 'shaA', 'running', '', '', '2026-06-04T00:00:00Z')`,
); err == nil {
t.Fatal("expected unique-index violation inserting a duplicate (session_id, target_sha)")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- A partial unique index backstops the per-worker lock in internal/review: it
-- prevents two concurrent (or cross-restart) Trigger calls from recording two
-- review_run rows for the same worker session at the same reviewed commit
-- (issue #242). Rows with an empty target_sha (head not yet observed) are
-- excluded so they aren't blocked — the engine lock still serialises those.

-- +goose Up
-- Pre-#242 daemons could already have recorded duplicate (session_id,
-- target_sha) rows from the un-serialised double-spawn. CREATE UNIQUE INDEX
-- would fail on that data and wedge daemon startup, so collapse each duplicate
-- group to a single survivor first. We keep a completed pass over a still-running
-- one (it carries the reviewer's verdict/body), then the newest by created_at —
-- the same row a post-migration GetReviewRunBySessionAndSHA lookup would return.
-- +goose StatementBegin
DELETE FROM review_run
WHERE target_sha != ''
AND rowid NOT IN (
SELECT rowid FROM (
SELECT rowid,
ROW_NUMBER() OVER (
PARTITION BY session_id, target_sha
ORDER BY CASE status WHEN 'complete' THEN 0 ELSE 1 END,
created_at DESC,
rowid DESC
) AS rn
FROM review_run
WHERE target_sha != ''
)
WHERE rn = 1
);
-- +goose StatementEnd

-- +goose StatementBegin
CREATE UNIQUE INDEX idx_review_run_session_sha
ON review_run (session_id, target_sha) WHERE target_sha != '';
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX idx_review_run_session_sha;
-- +goose StatementEnd
Loading
Loading