diff --git a/runs/repository/impl/action.go b/runs/repository/impl/action.go index 53e142e882..709684d081 100644 --- a/runs/repository/impl/action.go +++ b/runs/repository/impl/action.go @@ -346,6 +346,16 @@ func (r *actionRepo) ListActions(ctx context.Context, input interfaces.ListResou queryBuilder.WriteString(" LIMIT ?") args = append(args, input.Limit+1) + // Offset-based pagination. Callers either page by CursorToken (keyset) or by + // Offset; the two are mutually exclusive. + if input.CursorToken != "" && input.Offset > 0 { + return nil, fmt.Errorf("CursorToken and Offset are mutually exclusive") + } + if input.Offset > 0 { + queryBuilder.WriteString(" OFFSET ?") + args = append(args, input.Offset) + } + query := sqlx.Rebind(sqlx.DOLLAR, queryBuilder.String()) var actions []*models.Action diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index 09ff040eab..af344fde48 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + sort1 "sort" "testing" "time" @@ -392,6 +393,136 @@ func TestListRuns(t *testing.T) { } } +// TestListActionsOffsetPagination is a regression test for offset-based paging. +// ListActions only implemented keyset (CursorToken) paging and silently ignored +// Offset, so an offset-paging caller (WatchActions.listAndSendAllActions) re-read +// the same first page forever -- capping children_phase_counts at the page size +// and never terminating its loop for runs with more than one page of actions. +func TestListActionsOffsetPagination(t *testing.T) { + db := setupActionDB(t) + defer func() { db.Exec("DELETE FROM actions") }() + actionRepo, err := NewActionRepo(db, testDbConfig) + require.NoError(t, err) + ctx := context.Background() + + const total = 150 + for i := 0; i < total; i++ { + _, err := actionRepo.CreateAction(ctx, &models.Run{ + Project: "proj1", + Domain: "domain1", + RunName: fmt.Sprintf("r%03d", i), + Name: rootActionName, + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, false) + require.NoError(t, err) + } + + const pageSize = 50 + sortByName := []interfaces.SortParameter{NewSortParameter("run_name", interfaces.SortOrderAscending)} + + // Offset must shift the window: the page at Offset=50 must start at r050. + // Before the fix it started at r000 because Offset was ignored. + page2, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: pageSize, + SortParameters: sortByName, + }) + require.NoError(t, err) + require.NotEmpty(t, page2) + require.Equal(t, "r050", page2[0].RunName, + "Offset must skip the first page; got the first page back (Offset ignored)") + + // Walking all pages by offset must cover every action exactly once and terminate. + // The loop bound also guards against the pre-fix infinite re-read of page one. + seen := map[string]struct{}{} + for offset := 0; offset < total*2; offset += pageSize { + batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: offset, + SortParameters: sortByName, + }) + require.NoError(t, err) + page := batch + if len(page) > pageSize { // trim the Limit+1 has-more probe row + page = page[:pageSize] + } + for _, a := range page { + seen[a.RunName] = struct{}{} + } + if len(batch) <= pageSize { + break + } + } + assert.Len(t, seen, total, "offset paging must cover all actions exactly once") +} + +// TestListActionsOffsetPaginationTiedCreatedAt is a regression test for the +// children_phase_counts undercount on large map tasks. WatchActions pages the +// snapshot by Offset sorted by created_at, but a map task bulk-creates thousands +// of children with identical created_at. OFFSET over a non-unique ORDER BY has +// undefined order among ties, so pages overlap/skip and the snapshot loses rows +// (count came up short, e.g. 4.8K instead of 20000). The fix adds a deterministic +// "name" tiebreaker. Here we force tied created_at and assert offset paging by +// (created_at, run_name) stays totally ordered and covers every row exactly once. +func TestListActionsOffsetPaginationTiedCreatedAt(t *testing.T) { + db := setupActionDB(t) + defer func() { db.Exec("DELETE FROM actions") }() + actionRepo, err := NewActionRepo(db, testDbConfig) + require.NoError(t, err) + ctx := context.Background() + + const total = 150 + for i := 0; i < total; i++ { + _, err := actionRepo.CreateAction(ctx, &models.Run{ + Project: "proj1", + Domain: "domain1", + RunName: fmt.Sprintf("r%03d", i), + Name: rootActionName, + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, false) + require.NoError(t, err) + } + + // Force every action to share one created_at, reproducing a bulk-created map task. + _, err = db.Exec("UPDATE actions SET created_at = '2024-01-01T00:00:00Z'") + require.NoError(t, err) + + const pageSize = 50 + sort := []interfaces.SortParameter{ + NewSortParameter("created_at", interfaces.SortOrderAscending), + NewSortParameter("run_name", interfaces.SortOrderAscending), + } + + seen := map[string]struct{}{} + var ordered []string + for offset := 0; offset < total*2; offset += pageSize { + batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: offset, + SortParameters: sort, + }) + require.NoError(t, err) + page := batch + if len(page) > pageSize { // trim the Limit+1 has-more probe row + page = page[:pageSize] + } + for _, a := range page { + seen[a.RunName] = struct{}{} + ordered = append(ordered, a.RunName) + } + if len(batch) <= pageSize { + break + } + } + + assert.Len(t, seen, total, "offset paging over tied created_at must cover all actions exactly once") + assert.True(t, sort1.IsSorted(sort1.StringSlice(ordered)), + "the name tiebreaker must give a total order so pages don't overlap/skip") +} + func setupActionEventDB(t *testing.T) (*sqlx.DB, *actionRepo) { db := setupActionDB(t) r, err := NewActionRepo(db, testDbConfig) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index 21be8e208f..0b774e7923 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -1313,12 +1313,17 @@ func (s *RunService) listAndSendAllActions( // Sort ascending by created_at so parent actions are inserted into the // run state manager before their children. insertAction requires the // parent node to already exist in the tree when a child is processed. + // "name" is a deterministic tiebreaker: a map task bulk-creates thousands + // of children with identical created_at, and OFFSET paging over a + // non-unique ORDER BY skips/duplicates rows, so the snapshot loses actions + // and ChildPhaseCounts comes up short. name is unique within a run. batch, err := s.repo.ActionRepo().ListActions(ctx, interfaces.ListResourceInput{ Filter: impl.NewRunActionsFilter(runID), Limit: pageSize, Offset: offset, SortParameters: []interfaces.SortParameter{ impl.NewSortParameter("created_at", interfaces.SortOrderAscending), + impl.NewSortParameter("name", interfaces.SortOrderAscending), }, }) if err != nil { diff --git a/runs/service/run_service_test.go b/runs/service/run_service_test.go index f56d0ba5c3..02c728e662 100644 --- a/runs/service/run_service_test.go +++ b/runs/service/run_service_test.go @@ -807,8 +807,9 @@ func TestListAndSendAllActionsUsesAscendingSort(t *testing.T) { err = svc.listAndSendAllActions(context.Background(), runID, rsm, nil) require.NoError(t, err) - require.Len(t, captured.SortParameters, 1) + require.Len(t, captured.SortParameters, 2) assert.Equal(t, "created_at ASC", captured.SortParameters[0].GetOrderExpr()) + assert.Equal(t, "name ASC", captured.SortParameters[1].GetOrderExpr()) } func TestGenerateRunName(t *testing.T) {