From a1f3e9843fe10c1c1f785772b36f4d1d142a55fd Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jun 2026 16:56:04 -0700 Subject: [PATCH 1/7] fix(runs): honor Offset in ListActions so WatchActions loads all child actions ListActions only implemented keyset (CursorToken) pagination and silently ignored the Offset field. WatchActions.listAndSendAllActions pages by Offset (offset += pageSize), so every iteration ran the identical query and re-read the same first page. Effects: - only the first pageSize (100) actions ever entered the run-state tree, capping children_phase_counts at 100 regardless of the real child count (observed with a 10k-element map task). - for runs with more than one page, len(batch) < pageSize was never true, so the initial-snapshot loop never returned and the live-update loop was never reached. Add the OFFSET clause to ListActions when Offset > 0. Regression test asserts offset shifts the page window and that walking by offset covers all actions and terminates. Signed-off-by: Kevin Su --- runs/repository/impl/action.go | 11 +++++ runs/repository/impl/action_test.go | 65 +++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/runs/repository/impl/action.go b/runs/repository/impl/action.go index b2412c6fd8..83e140504e 100644 --- a/runs/repository/impl/action.go +++ b/runs/repository/impl/action.go @@ -346,6 +346,17 @@ func (r *actionRepo) ListActions(ctx context.Context, input interfaces.ListResou queryBuilder.WriteString(" LIMIT ?") args = append(args, input.Limit+1) + // Offset-based pagination. Callers either page by CursorToken (keyset) or by + // Offset; the two are mutually exclusive. Without this clause Offset is + // silently ignored, so an offset-paging caller re-reads the same first page + // forever (e.g. WatchActions.listAndSendAllActions only ever loaded the first + // 100 actions, capping children_phase_counts at the page size and never + // terminating its loop for runs with more than a page of actions). + if input.Offset > 0 { + queryBuilder.WriteString(" OFFSET ?") + args = append(args, input.Offset) + } + query := sqlx.Rebind(sqlx.DOLLAR, queryBuilder.String()) var actions []*models.Action diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index a98405d89b..81e49be3e9 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -389,6 +389,71 @@ func TestListRuns(t *testing.T) { } } +// TestListActionsOffsetPagination is a regression test for offset-based paging. +// ListActions only implemented keyset (CursorToken) paging and silently ignored +// Offset, so an offset-paging caller (WatchActions.listAndSendAllActions) re-read +// the same first page forever -- capping children_phase_counts at the page size +// and never terminating its loop for runs with more than one page of actions. +func TestListActionsOffsetPagination(t *testing.T) { + db := setupActionDB(t) + defer func() { db.Exec("DELETE FROM actions") }() + actionRepo, err := NewActionRepo(db, testDbConfig) + require.NoError(t, err) + ctx := context.Background() + + const total = 150 + for i := 0; i < total; i++ { + _, err := actionRepo.CreateAction(ctx, &models.Run{ + Project: "proj1", + Domain: "domain1", + RunName: fmt.Sprintf("r%03d", i), + Name: rootActionName, + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, false) + require.NoError(t, err) + } + + const pageSize = 50 + sortByName := []interfaces.SortParameter{NewSortParameter("run_name", interfaces.SortOrderAscending)} + + // Offset must shift the window: the page at Offset=50 must start at r050. + // Before the fix it started at r000 because Offset was ignored. + page2, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: pageSize, + SortParameters: sortByName, + }) + require.NoError(t, err) + require.NotEmpty(t, page2) + require.Equal(t, "r050", page2[0].RunName, + "Offset must skip the first page; got the first page back (Offset ignored)") + + // Walking all pages by offset must cover every action exactly once and terminate. + // The loop bound also guards against the pre-fix infinite re-read of page one. + seen := map[string]struct{}{} + for offset := 0; offset < total*2; offset += pageSize { + batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: offset, + SortParameters: sortByName, + }) + require.NoError(t, err) + page := batch + if len(page) > pageSize { // trim the Limit+1 has-more probe row + page = page[:pageSize] + } + for _, a := range page { + seen[a.RunName] = struct{}{} + } + if len(batch) < pageSize { + break + } + } + assert.Len(t, seen, total, "offset paging must cover all actions exactly once") +} + func setupActionEventDB(t *testing.T) (*sqlx.DB, *actionRepo) { db := setupActionDB(t) r, err := NewActionRepo(db, testDbConfig) From 09b63c176aae6db9c32c542847778f3c06fff555 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jun 2026 23:55:12 -0700 Subject: [PATCH 2/7] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su --- runs/repository/impl/action.go | 3 +++ runs/repository/impl/action_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/runs/repository/impl/action.go b/runs/repository/impl/action.go index 83e140504e..94f63c196b 100644 --- a/runs/repository/impl/action.go +++ b/runs/repository/impl/action.go @@ -352,6 +352,9 @@ func (r *actionRepo) ListActions(ctx context.Context, input interfaces.ListResou // forever (e.g. WatchActions.listAndSendAllActions only ever loaded the first // 100 actions, capping children_phase_counts at the page size and never // terminating its loop for runs with more than a page of actions). + if input.CursorToken != "" && input.Offset > 0 { + return nil, fmt.Errorf("CursorToken and Offset are mutually exclusive") + } if input.Offset > 0 { queryBuilder.WriteString(" OFFSET ?") args = append(args, input.Offset) diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index 81e49be3e9..e4c0d5fdad 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -447,7 +447,7 @@ func TestListActionsOffsetPagination(t *testing.T) { for _, a := range page { seen[a.RunName] = struct{}{} } - if len(batch) < pageSize { + if len(batch) <= pageSize { break } } From 2db9a4ae9c85d5ebcf6e01c4131d9472ad404141 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jun 2026 23:56:21 -0700 Subject: [PATCH 3/7] nit Signed-off-by: Kevin Su --- charts/flyte-binary/values.yaml | 13 +++++++------ runs/repository/impl/action.go | 6 +----- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/charts/flyte-binary/values.yaml b/charts/flyte-binary/values.yaml index 0fd7e8c919..940f8e11a3 100644 --- a/charts/flyte-binary/values.yaml +++ b/charts/flyte-binary/values.yaml @@ -198,12 +198,13 @@ deployment: # lifecycleHooks Specify hooks to run in Flyte container before or after startup lifecycleHooks: {} # resources Resource limits and requests for Flyte container - # Uncomment and update to specify resources for deployment - # resources: - # limits: - # memory: 1Gi - # requests: - # cpu: 1 + resources: + limits: + cpu: "4" + memory: 8Gi + requests: + cpu: "2" + memory: 4Gi # podSecurityContext Specify security context for Flyte pod # See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ podSecurityContext: diff --git a/runs/repository/impl/action.go b/runs/repository/impl/action.go index 94f63c196b..0ac82624dc 100644 --- a/runs/repository/impl/action.go +++ b/runs/repository/impl/action.go @@ -347,11 +347,7 @@ func (r *actionRepo) ListActions(ctx context.Context, input interfaces.ListResou args = append(args, input.Limit+1) // Offset-based pagination. Callers either page by CursorToken (keyset) or by - // Offset; the two are mutually exclusive. Without this clause Offset is - // silently ignored, so an offset-paging caller re-reads the same first page - // forever (e.g. WatchActions.listAndSendAllActions only ever loaded the first - // 100 actions, capping children_phase_counts at the page size and never - // terminating its loop for runs with more than a page of actions). + // Offset; the two are mutually exclusive. if input.CursorToken != "" && input.Offset > 0 { return nil, fmt.Errorf("CursorToken and Offset are mutually exclusive") } From 70ae6111e96e09b2594371985aea8b217e3b9097 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jun 2026 23:57:11 -0700 Subject: [PATCH 4/7] nit Signed-off-by: Kevin Su --- charts/flyte-binary/values.yaml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/charts/flyte-binary/values.yaml b/charts/flyte-binary/values.yaml index 4d7f938e84..982423c10d 100644 --- a/charts/flyte-binary/values.yaml +++ b/charts/flyte-binary/values.yaml @@ -206,13 +206,12 @@ deployment: # lifecycleHooks Specify hooks to run in Flyte container before or after startup lifecycleHooks: {} # resources Resource limits and requests for Flyte container - resources: - limits: - cpu: "4" - memory: 8Gi - requests: - cpu: "2" - memory: 4Gi + # Uncomment and update to specify resources for deployment + # resources: + # limits: + # memory: 1Gi + # requests: + # cpu: 1 # podSecurityContext Specify security context for Flyte pod # See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ podSecurityContext: From 7986b108d660f47b0c2e4dda89f70770e247a419 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jun 2026 23:57:29 -0700 Subject: [PATCH 5/7] nit Signed-off-by: Kevin Su --- charts/flyte-binary/values.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/charts/flyte-binary/values.yaml b/charts/flyte-binary/values.yaml index 982423c10d..9e9b714b2f 100644 --- a/charts/flyte-binary/values.yaml +++ b/charts/flyte-binary/values.yaml @@ -206,10 +206,10 @@ deployment: # lifecycleHooks Specify hooks to run in Flyte container before or after startup lifecycleHooks: {} # resources Resource limits and requests for Flyte container - # Uncomment and update to specify resources for deployment - # resources: - # limits: - # memory: 1Gi + # Uncomment and update to specify resources for deployment + # resources: + # limits: + # memory: 1Gi # requests: # cpu: 1 # podSecurityContext Specify security context for Flyte pod From 0e59f175d100802de36ec8104c37bb76ef3304c5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 24 Jun 2026 00:17:14 -0700 Subject: [PATCH 6/7] fix(runs): add name tiebreaker so offset paging doesn't drop child actions A map task bulk-creates thousands of children with identical created_at. listAndSendAllActions paged the snapshot by Offset sorted on created_at alone; OFFSET over a non-unique ORDER BY has undefined order among ties, so pages overlap/skip and the run-state tree loses rows -- children_phase_counts came up short (e.g. 4.8K instead of 20000). Add a deterministic 'name' tiebreaker (unique within a run) to the sort so offset paging is stable. Signed-off-by: Kevin Su --- runs/repository/impl/action_test.go | 66 +++++++++++++++++++++++++++++ runs/service/run_service.go | 5 +++ 2 files changed, 71 insertions(+) diff --git a/runs/repository/impl/action_test.go b/runs/repository/impl/action_test.go index c8adfec630..af344fde48 100644 --- a/runs/repository/impl/action_test.go +++ b/runs/repository/impl/action_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + sort1 "sort" "testing" "time" @@ -457,6 +458,71 @@ func TestListActionsOffsetPagination(t *testing.T) { assert.Len(t, seen, total, "offset paging must cover all actions exactly once") } +// TestListActionsOffsetPaginationTiedCreatedAt is a regression test for the +// children_phase_counts undercount on large map tasks. WatchActions pages the +// snapshot by Offset sorted by created_at, but a map task bulk-creates thousands +// of children with identical created_at. OFFSET over a non-unique ORDER BY has +// undefined order among ties, so pages overlap/skip and the snapshot loses rows +// (count came up short, e.g. 4.8K instead of 20000). The fix adds a deterministic +// "name" tiebreaker. Here we force tied created_at and assert offset paging by +// (created_at, run_name) stays totally ordered and covers every row exactly once. +func TestListActionsOffsetPaginationTiedCreatedAt(t *testing.T) { + db := setupActionDB(t) + defer func() { db.Exec("DELETE FROM actions") }() + actionRepo, err := NewActionRepo(db, testDbConfig) + require.NoError(t, err) + ctx := context.Background() + + const total = 150 + for i := 0; i < total; i++ { + _, err := actionRepo.CreateAction(ctx, &models.Run{ + Project: "proj1", + Domain: "domain1", + RunName: fmt.Sprintf("r%03d", i), + Name: rootActionName, + Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED), + }, false) + require.NoError(t, err) + } + + // Force every action to share one created_at, reproducing a bulk-created map task. + _, err = db.Exec("UPDATE actions SET created_at = '2024-01-01T00:00:00Z'") + require.NoError(t, err) + + const pageSize = 50 + sort := []interfaces.SortParameter{ + NewSortParameter("created_at", interfaces.SortOrderAscending), + NewSortParameter("run_name", interfaces.SortOrderAscending), + } + + seen := map[string]struct{}{} + var ordered []string + for offset := 0; offset < total*2; offset += pageSize { + batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{ + Filter: NewIsRootActionFilter(), + Limit: pageSize, + Offset: offset, + SortParameters: sort, + }) + require.NoError(t, err) + page := batch + if len(page) > pageSize { // trim the Limit+1 has-more probe row + page = page[:pageSize] + } + for _, a := range page { + seen[a.RunName] = struct{}{} + ordered = append(ordered, a.RunName) + } + if len(batch) <= pageSize { + break + } + } + + assert.Len(t, seen, total, "offset paging over tied created_at must cover all actions exactly once") + assert.True(t, sort1.IsSorted(sort1.StringSlice(ordered)), + "the name tiebreaker must give a total order so pages don't overlap/skip") +} + func setupActionEventDB(t *testing.T) (*sqlx.DB, *actionRepo) { db := setupActionDB(t) r, err := NewActionRepo(db, testDbConfig) diff --git a/runs/service/run_service.go b/runs/service/run_service.go index 21be8e208f..0b774e7923 100644 --- a/runs/service/run_service.go +++ b/runs/service/run_service.go @@ -1313,12 +1313,17 @@ func (s *RunService) listAndSendAllActions( // Sort ascending by created_at so parent actions are inserted into the // run state manager before their children. insertAction requires the // parent node to already exist in the tree when a child is processed. + // "name" is a deterministic tiebreaker: a map task bulk-creates thousands + // of children with identical created_at, and OFFSET paging over a + // non-unique ORDER BY skips/duplicates rows, so the snapshot loses actions + // and ChildPhaseCounts comes up short. name is unique within a run. batch, err := s.repo.ActionRepo().ListActions(ctx, interfaces.ListResourceInput{ Filter: impl.NewRunActionsFilter(runID), Limit: pageSize, Offset: offset, SortParameters: []interfaces.SortParameter{ impl.NewSortParameter("created_at", interfaces.SortOrderAscending), + impl.NewSortParameter("name", interfaces.SortOrderAscending), }, }) if err != nil { From 2e5003530974f87c8f6b29639c7d891e69110d1f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 24 Jun 2026 02:07:38 -0700 Subject: [PATCH 7/7] test: expect name tiebreaker in WatchActions ascending sort listAndSendAllActions now passes a (created_at ASC, name ASC) sort; update TestListAndSendAllActionsUsesAscendingSort to assert both parameters. Signed-off-by: Kevin Su --- runs/service/run_service_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runs/service/run_service_test.go b/runs/service/run_service_test.go index f56d0ba5c3..02c728e662 100644 --- a/runs/service/run_service_test.go +++ b/runs/service/run_service_test.go @@ -807,8 +807,9 @@ func TestListAndSendAllActionsUsesAscendingSort(t *testing.T) { err = svc.listAndSendAllActions(context.Background(), runID, rsm, nil) require.NoError(t, err) - require.Len(t, captured.SortParameters, 1) + require.Len(t, captured.SortParameters, 2) assert.Equal(t, "created_at ASC", captured.SortParameters[0].GetOrderExpr()) + assert.Equal(t, "name ASC", captured.SortParameters[1].GetOrderExpr()) } func TestGenerateRunName(t *testing.T) {