From 8cb4d827f38b0c944c6afdd43c8f8e41fd39af83 Mon Sep 17 00:00:00 2001 From: codebanditssss Date: Mon, 15 Jun 2026 16:16:13 +0530 Subject: [PATCH 1/2] fix(review): serialize concurrent triggers per worker to stop double-spawn Engine.Trigger was a read-then-write (idempotency check -> reviewer spawn -> InsertReviewRun) with no serialization and no backing constraint. Two near- simultaneous triggers for the same worker at the same head SHA both passed the GetReviewRunBySessionAndSHA check, both spawned a reviewer against the same deterministic review- handle, and both inserted a running run for one commit. Add a per-worker keyed mutex (lockWorker) held across the whole Trigger body, so the loser re-reads the freshly-recorded run and short-circuits to Created:false instead of spawning. Back it with a partial unique index on review_run(session_id, target_sha) (migration 0013) as a cross-restart safety net; rows with an empty target_sha (head not yet observed) are excluded so they are not blocked. Adds a concurrency test asserting N simultaneous triggers spawn once and record one run. Closes #242 --- backend/internal/review/review.go | 50 +++++++++++++--- backend/internal/review/review_test.go | 58 ++++++++++++++++--- .../migrations/0013_review_run_unique_sha.sql | 16 +++++ 3 files changed, 109 insertions(+), 15 deletions(-) create mode 100644 backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql diff --git a/backend/internal/review/review.go b/backend/internal/review/review.go index 9664b0a0..fe0224da 100644 --- a/backend/internal/review/review.go +++ b/backend/internal/review/review.go @@ -12,6 +12,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/google/uuid" @@ -74,6 +75,12 @@ type Engine struct { launcher Launcher clock func() time.Time newID func() string + + // triggerMu guards triggerLocks; triggerLocks holds one mutex per worker + // session so concurrent Trigger calls for the same worker serialise (see + // lockWorker). Distinct workers never contend. + triggerMu sync.Mutex + triggerLocks map[domain.SessionID]*sync.Mutex } // New wires an Engine from its dependencies, defaulting the clock and id source. @@ -87,14 +94,35 @@ func New(d Deps) *Engine { newID = uuid.NewString } return &Engine{ - store: d.Store, - sessions: d.Sessions, - prs: d.PRs, - projects: d.Projects, - launcher: d.Launcher, - clock: clock, - newID: newID, + store: d.Store, + sessions: d.Sessions, + prs: d.PRs, + projects: d.Projects, + launcher: d.Launcher, + clock: clock, + newID: newID, + triggerLocks: make(map[domain.SessionID]*sync.Mutex), + } +} + +// lockWorker serialises Trigger calls for a single worker session and returns +// the unlock func. Without it, two concurrent triggers for the same worker can +// both pass the per-commit idempotency check and each spawn a reviewer against +// the same deterministic handle, leaving two running runs for one commit (#242). +// +// The per-worker mutex is created on first use and kept for the lifetime of the +// engine; the entry is a single pointer, so the unbounded-by-session-count map +// is a negligible, bounded-in-practice cost. +func (e *Engine) lockWorker(id domain.SessionID) func() { + e.triggerMu.Lock() + mu, ok := e.triggerLocks[id] + if !ok { + mu = &sync.Mutex{} + e.triggerLocks[id] = mu } + e.triggerMu.Unlock() + mu.Lock() + return mu.Unlock } // TriggerResult is the outcome of a trigger: the (new or existing) run, the live @@ -122,6 +150,14 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge if workerID == "" { return TriggerResult{}, fmt.Errorf("%w: worker session id is required", ErrInvalid) } + + // Serialise concurrent triggers for this worker so the idempotency check + // below (and the reviewer spawn that follows it) can't be raced into a + // double-spawn. Held across the spawn deliberately: the loser then re-reads + // the freshly-recorded run and short-circuits to Created:false. + unlock := e.lockWorker(workerID) + defer unlock() + worker, ok, err := e.sessions.GetSession(ctx, workerID) if err != nil { return TriggerResult{}, err diff --git a/backend/internal/review/review_test.go b/backend/internal/review/review_test.go index 333bcdba..2fb7d0c2 100644 --- a/backend/internal/review/review_test.go +++ b/backend/internal/review/review_test.go @@ -3,6 +3,7 @@ package review import ( "context" "errors" + "sync" "testing" "time" @@ -87,18 +88,20 @@ func (f fakeProjects) GetProject(_ context.Context, id string) (domain.ProjectRe } type fakeLauncher struct { - handle string - alive bool - spawnErr error - notifyErr error - spawned bool - notified bool - gotSpec LaunchSpec - gotHandle string + handle string + alive bool + spawnErr error + notifyErr error + spawned bool + spawnCount int + notified bool + gotSpec LaunchSpec + gotHandle string } func (f *fakeLauncher) Spawn(_ context.Context, spec LaunchSpec) (string, error) { f.spawned = true + f.spawnCount++ f.gotSpec = spec if f.spawnErr != nil { return "", f.spawnErr @@ -163,6 +166,45 @@ func TestTriggerSpawnsNewReviewerAndRecordsRunAfterLaunch(t *testing.T) { } } +func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) { + store := &fakeStore{} + launcher := &fakeLauncher{handle: "review-mer-1"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + const n = 8 + var wg sync.WaitGroup + results := make([]TriggerResult, n) + errs := make([]error, n) + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + results[i], errs[i] = eng.Trigger(context.Background(), "mer-1") + }(i) + } + wg.Wait() + + created := 0 + for i := 0; i < n; i++ { + if errs[i] != nil { + t.Fatalf("Trigger[%d]: %v", i, errs[i]) + } + if results[i].Created { + created++ + } + } + // Exactly one trigger does the work; the rest reuse its run. + if created != 1 { + t.Errorf("Created=true count = %d, want exactly 1", created) + } + if launcher.spawnCount != 1 { + t.Errorf("reviewer spawn count = %d, want 1", launcher.spawnCount) + } + if len(store.runs) != 1 { + t.Errorf("recorded review runs = %d, want 1", len(store.runs)) + } +} + func TestTriggerIsIdempotentForSameCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, diff --git a/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql b/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql new file mode 100644 index 00000000..eebccc49 --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql @@ -0,0 +1,16 @@ +-- A partial unique index backstops the per-worker lock in internal/review: it +-- prevents two concurrent (or cross-restart) Trigger calls from recording two +-- review_run rows for the same worker session at the same reviewed commit +-- (issue #242). Rows with an empty target_sha (head not yet observed) are +-- excluded so they aren't blocked — the engine lock still serialises those. + +-- +goose Up +-- +goose StatementBegin +CREATE UNIQUE INDEX idx_review_run_session_sha + ON review_run (session_id, target_sha) WHERE target_sha != ''; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX idx_review_run_session_sha; +-- +goose StatementEnd From bc223c4e0feeb5b8f16ebcd8a643d65b215ad4e4 Mon Sep 17 00:00:00 2001 From: codebanditssss Date: Wed, 17 Jun 2026 00:20:12 +0530 Subject: [PATCH 2/2] fix(review): make migration 0013 dedup-safe and handle the unique conflict in Trigger Pre-#242 daemons can already hold duplicate (session_id, target_sha) review_run rows, on which CREATE UNIQUE INDEX fails and wedges startup. Migration 0013 now collapses each duplicate group to a single survivor (a completed pass over a still-running one, then newest by created_at) before building the index. Trigger now treats a unique-constraint hit as a fallback rather than an error: InsertReviewRun maps it to the new domain.ErrDuplicateReviewRun sentinel, and Trigger re-reads GetReviewRunBySessionAndSHA and returns that run with Created:false instead of surfacing a raw error after the reviewer may already have launched. --- backend/internal/domain/review.go | 11 +- backend/internal/review/review.go | 12 +++ backend/internal/review/review_test.go | 33 ++++++ .../sqlite/migrate_review_dedup_test.go | 102 ++++++++++++++++++ .../migrations/0013_review_run_unique_sha.sql | 25 +++++ .../storage/sqlite/store/review_store.go | 10 +- .../storage/sqlite/store/review_store_test.go | 43 ++++++++ 7 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 backend/internal/storage/sqlite/migrate_review_dedup_test.go diff --git a/backend/internal/domain/review.go b/backend/internal/domain/review.go index a71556da..8b38a40f 100644 --- a/backend/internal/domain/review.go +++ b/backend/internal/domain/review.go @@ -1,6 +1,15 @@ package domain -import "time" +import ( + "errors" + "time" +) + +// ErrDuplicateReviewRun is returned by InsertReviewRun when a run already exists +// for the same worker session and target commit (the partial unique index from +// migration 0013). It lets the review engine fall back to the recorded run +// instead of surfacing a raw storage error after a reviewer may have launched. +var ErrDuplicateReviewRun = errors.New("domain: review run already exists for session and target sha") // Review is the per-worker code-review record: one row per worker session // (SessionID is unique). A repeat trigger reuses this row; the per-pass facts diff --git a/backend/internal/review/review.go b/backend/internal/review/review.go index fe0224da..9b064625 100644 --- a/backend/internal/review/review.go +++ b/backend/internal/review/review.go @@ -245,6 +245,18 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge CreatedAt: now, } if err := e.store.InsertReviewRun(ctx, run); err != nil { + // The per-worker lock serialises in-process triggers, but the unique + // index (migration 0013) can still reject a run a concurrent daemon (or + // a pre-lock restart) recorded for this commit. The reviewer is already + // launched, so don't surface a raw error: re-read the recorded run and + // return it as the existing, not-newly-created pass. + if errors.Is(err, domain.ErrDuplicateReviewRun) { + if existing, ok, getErr := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); getErr != nil { + return TriggerResult{}, getErr + } else if ok { + return TriggerResult{Run: existing, ReviewerHandleID: handleID, Created: false}, nil + } + } return TriggerResult{}, err } return TriggerResult{Run: run, ReviewerHandleID: handleID, Created: true}, nil diff --git a/backend/internal/review/review_test.go b/backend/internal/review/review_test.go index 2fb7d0c2..5a0a2eff 100644 --- a/backend/internal/review/review_test.go +++ b/backend/internal/review/review_test.go @@ -15,6 +15,11 @@ import ( type fakeStore struct { review *domain.Review runs []domain.ReviewRun + // insertErr, when set, makes the next InsertReviewRun model a concurrent + // writer that already recorded a run for this commit: it records that + // winner (so a follow-up GetReviewRunBySessionAndSHA finds it) and returns + // insertErr instead of recording the caller's run. + insertErr error } func (f *fakeStore) UpsertReview(_ context.Context, r domain.Review) error { @@ -29,6 +34,12 @@ func (f *fakeStore) GetReviewBySession(_ context.Context, _ domain.SessionID) (d return *f.review, true, nil } func (f *fakeStore) InsertReviewRun(_ context.Context, r domain.ReviewRun) error { + if f.insertErr != nil { + winner := r + winner.ID = "winner-" + r.ID + f.runs = append(f.runs, winner) + return f.insertErr + } f.runs = append(f.runs, r) return nil } @@ -205,6 +216,28 @@ func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) { } } +func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) { + // The idempotency check passes (no run yet), the reviewer launches, but the + // insert loses to a concurrent writer the unique index already accepted. + store := &fakeStore{insertErr: domain.ErrDuplicateReviewRun} + launcher := &fakeLauncher{handle: "review-mer-1"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if res.Created { + t.Fatalf("expected Created=false on unique conflict: %+v", res) + } + if res.Run.TargetSHA != "sha1" || res.Run.ID != "winner-id-1" { + t.Fatalf("expected the recorded winner run, got %+v", res.Run) + } + if launcher.spawnCount != 1 { + t.Fatalf("reviewer should still have launched once: %+v", launcher) + } +} + func TestTriggerIsIdempotentForSameCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, diff --git a/backend/internal/storage/sqlite/migrate_review_dedup_test.go b/backend/internal/storage/sqlite/migrate_review_dedup_test.go new file mode 100644 index 00000000..fa815c06 --- /dev/null +++ b/backend/internal/storage/sqlite/migrate_review_dedup_test.go @@ -0,0 +1,102 @@ +package sqlite + +import ( + "database/sql" + "path/filepath" + "testing" + + "github.com/pressly/goose/v3" +) + +// upTo migrates the db to a specific goose version, sharing migrate()'s goose +// global setup under gooseMu. +func upTo(t *testing.T, db *sql.DB, version int64) { + t.Helper() + gooseMu.Lock() + defer gooseMu.Unlock() + goose.SetBaseFS(migrationsFS) + goose.SetLogger(goose.NopLogger()) + if err := goose.SetDialect("sqlite3"); err != nil { + t.Fatalf("set dialect: %v", err) + } + if err := goose.UpTo(db, "migrations", version); err != nil { + t.Fatalf("migrate to %d: %v", version, err) + } +} + +// TestMigration0013DedupesExistingDuplicates guards the data-safety concern in +// #246: a pre-#242 daemon could already hold duplicate (session_id, target_sha) +// review_run rows, on which CREATE UNIQUE INDEX would fail and wedge startup. The +// migration must collapse each group to one survivor first. We open without the +// foreign_keys pragma so review_run rows can be seeded without the full +// project/session/review parent chain — the dedup is pure data movement. +func TestMigration0013DedupesExistingDuplicates(t *testing.T) { + db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "ao.db")+"?_pragma=busy_timeout(5000)") + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + + // Stop just before 0013: review tables exist, the unique index does not. + upTo(t, db, 12) + + // One duplicate group on shaA (a stale run, a completed pass carrying the + // verdict, and a newer still-running pass), plus a distinct sha and two + // empty-sha rows that the partial index excludes and must all survive. + seed := []struct{ id, sha, status, createdAt string }{ + {"r-old", "shaA", "running", "2026-06-01T00:00:00Z"}, + {"r-complete", "shaA", "complete", "2026-06-02T00:00:00Z"}, + {"r-new-running", "shaA", "running", "2026-06-03T00:00:00Z"}, + {"r-other-sha", "shaB", "running", "2026-06-01T00:00:00Z"}, + {"r-empty-1", "", "running", "2026-06-01T00:00:00Z"}, + {"r-empty-2", "", "running", "2026-06-02T00:00:00Z"}, + } + for _, r := range seed { + if _, err := db.Exec( + `INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at) + VALUES (?, 'rev-1', 's1', 'claude-code', '', ?, ?, '', '', ?)`, + r.id, r.sha, r.status, r.createdAt, + ); err != nil { + t.Fatalf("seed %s: %v", r.id, err) + } + } + + // Applying 0013 dedupes, then builds the unique index. + upTo(t, db, 13) + + survivors := map[string]bool{} + rows, err := db.Query(`SELECT id FROM review_run`) + if err != nil { + t.Fatalf("query survivors: %v", err) + } + defer rows.Close() + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + t.Fatalf("scan: %v", err) + } + survivors[id] = true + } + if err := rows.Err(); err != nil { + t.Fatalf("rows: %v", err) + } + + // shaA collapses to the completed pass; everything else is untouched. + want := []string{"r-complete", "r-other-sha", "r-empty-1", "r-empty-2"} + if len(survivors) != len(want) { + t.Fatalf("survivors = %v, want exactly %v", survivors, want) + } + for _, id := range want { + if !survivors[id] { + t.Errorf("expected %q to survive the dedup", id) + } + } + + // The index is live and now rejects a fresh duplicate. + if _, err := db.Exec( + `INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at) + VALUES ('dup', 'rev-1', 's1', 'claude-code', '', 'shaA', 'running', '', '', '2026-06-04T00:00:00Z')`, + ); err == nil { + t.Fatal("expected unique-index violation inserting a duplicate (session_id, target_sha)") + } +} diff --git a/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql b/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql index eebccc49..424f03ab 100644 --- a/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql +++ b/backend/internal/storage/sqlite/migrations/0013_review_run_unique_sha.sql @@ -5,6 +5,31 @@ -- excluded so they aren't blocked — the engine lock still serialises those. -- +goose Up +-- Pre-#242 daemons could already have recorded duplicate (session_id, +-- target_sha) rows from the un-serialised double-spawn. CREATE UNIQUE INDEX +-- would fail on that data and wedge daemon startup, so collapse each duplicate +-- group to a single survivor first. We keep a completed pass over a still-running +-- one (it carries the reviewer's verdict/body), then the newest by created_at — +-- the same row a post-migration GetReviewRunBySessionAndSHA lookup would return. +-- +goose StatementBegin +DELETE FROM review_run +WHERE target_sha != '' + AND rowid NOT IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER ( + PARTITION BY session_id, target_sha + ORDER BY CASE status WHEN 'complete' THEN 0 ELSE 1 END, + created_at DESC, + rowid DESC + ) AS rn + FROM review_run + WHERE target_sha != '' + ) + WHERE rn = 1 + ); +-- +goose StatementEnd + -- +goose StatementBegin CREATE UNIQUE INDEX idx_review_run_session_sha ON review_run (session_id, target_sha) WHERE target_sha != ''; diff --git a/backend/internal/storage/sqlite/store/review_store.go b/backend/internal/storage/sqlite/store/review_store.go index 36dfc9c7..b872b33a 100644 --- a/backend/internal/storage/sqlite/store/review_store.go +++ b/backend/internal/storage/sqlite/store/review_store.go @@ -39,11 +39,13 @@ func (s *Store) GetReviewBySession(ctx context.Context, id domain.SessionID) (do return reviewFromRow(row), true, nil } -// InsertReviewRun records a new review pass. +// InsertReviewRun records a new review pass. A unique-constraint hit on the +// (session_id, target_sha) index (migration 0013) is surfaced as the sentinel +// domain.ErrDuplicateReviewRun so the engine can fall back to the existing run. func (s *Store) InsertReviewRun(ctx context.Context, r domain.ReviewRun) error { s.writeMu.Lock() defer s.writeMu.Unlock() - return s.qw.InsertReviewRun(ctx, gen.InsertReviewRunParams{ + err := s.qw.InsertReviewRun(ctx, gen.InsertReviewRunParams{ ID: r.ID, ReviewID: r.ReviewID, SessionID: r.SessionID, @@ -55,6 +57,10 @@ func (s *Store) InsertReviewRun(ctx context.Context, r domain.ReviewRun) error { Body: r.Body, CreatedAt: r.CreatedAt, }) + if isSQLiteUnique(err) { + return fmt.Errorf("insert review run for session %s sha %s: %w", r.SessionID, r.TargetSHA, domain.ErrDuplicateReviewRun) + } + return err } // UpdateReviewRunResult sets the status/verdict/body of a running review pass. diff --git a/backend/internal/storage/sqlite/store/review_store_test.go b/backend/internal/storage/sqlite/store/review_store_test.go index e6643ac4..bf20cd8d 100644 --- a/backend/internal/storage/sqlite/store/review_store_test.go +++ b/backend/internal/storage/sqlite/store/review_store_test.go @@ -2,12 +2,55 @@ package store_test import ( "context" + "errors" "testing" "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" ) +func TestInsertReviewRunDuplicateSHAMapsToSentinel(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + rec, err := s.CreateSession(ctx, sampleRecord("mer")) + if err != nil { + t.Fatalf("create session: %v", err) + } + now := time.Now().UTC().Truncate(time.Second) + if err := s.UpsertReview(ctx, domain.Review{ + ID: "rev-1", SessionID: rec.ID, ProjectID: rec.ProjectID, + Harness: domain.ReviewerClaudeCode, CreatedAt: now, UpdatedAt: now, + }); err != nil { + t.Fatalf("upsert review: %v", err) + } + run := domain.ReviewRun{ + ID: "run-1", ReviewID: "rev-1", SessionID: rec.ID, Harness: domain.ReviewerClaudeCode, + TargetSHA: "sha1", Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone, CreatedAt: now, + } + if err := s.InsertReviewRun(ctx, run); err != nil { + t.Fatalf("first insert: %v", err) + } + + // A second run for the same (session_id, target_sha) hits the partial unique + // index (migration 0013) and must surface as the sentinel so the engine can + // fall back to the existing run. + dup := run + dup.ID = "run-2" + if err := s.InsertReviewRun(ctx, dup); !errors.Is(err, domain.ErrDuplicateReviewRun) { + t.Fatalf("duplicate insert err = %v, want ErrDuplicateReviewRun", err) + } + + // An empty target_sha is excluded from the index, so two are allowed. + for _, id := range []string{"run-empty-1", "run-empty-2"} { + r := run + r.ID, r.TargetSHA = id, "" + if err := s.InsertReviewRun(ctx, r); err != nil { + t.Fatalf("empty-sha insert %s: %v", id, err) + } + } +} + func TestReviewUpsertReusesRowAndRunRoundTrip(t *testing.T) { s := newTestStore(t) ctx := context.Background()