Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions runs/repository/impl/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ func (r *actionRepo) ListActions(ctx context.Context, input interfaces.ListResou
queryBuilder.WriteString(" LIMIT ?")
args = append(args, input.Limit+1)

// Offset-based pagination. Callers either page by CursorToken (keyset) or by
// Offset; the two are mutually exclusive.
if input.CursorToken != "" && input.Offset > 0 {
return nil, fmt.Errorf("CursorToken and Offset are mutually exclusive")
}
if input.Offset > 0 {
queryBuilder.WriteString(" OFFSET ?")
args = append(args, input.Offset)
}
Comment thread
pingsutw marked this conversation as resolved.
Comment on lines +349 to +357

query := sqlx.Rebind(sqlx.DOLLAR, queryBuilder.String())

var actions []*models.Action
Expand Down
131 changes: 131 additions & 0 deletions runs/repository/impl/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net"
sort1 "sort"
"testing"
"time"

Expand Down Expand Up @@ -392,6 +393,136 @@ func TestListRuns(t *testing.T) {
}
}

// TestListActionsOffsetPagination is a regression test for offset-based paging.
// ListActions only implemented keyset (CursorToken) paging and silently ignored
// Offset, so an offset-paging caller (WatchActions.listAndSendAllActions) re-read
// the same first page forever -- capping children_phase_counts at the page size
// and never terminating its loop for runs with more than one page of actions.
func TestListActionsOffsetPagination(t *testing.T) {
db := setupActionDB(t)
defer func() { db.Exec("DELETE FROM actions") }()
actionRepo, err := NewActionRepo(db, testDbConfig)
require.NoError(t, err)
ctx := context.Background()

const total = 150
for i := 0; i < total; i++ {
_, err := actionRepo.CreateAction(ctx, &models.Run{
Project: "proj1",
Domain: "domain1",
RunName: fmt.Sprintf("r%03d", i),
Name: rootActionName,
Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED),
}, false)
require.NoError(t, err)
}

const pageSize = 50
sortByName := []interfaces.SortParameter{NewSortParameter("run_name", interfaces.SortOrderAscending)}

// Offset must shift the window: the page at Offset=50 must start at r050.
// Before the fix it started at r000 because Offset was ignored.
page2, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{
Filter: NewIsRootActionFilter(),
Limit: pageSize,
Offset: pageSize,
SortParameters: sortByName,
})
require.NoError(t, err)
require.NotEmpty(t, page2)
require.Equal(t, "r050", page2[0].RunName,
"Offset must skip the first page; got the first page back (Offset ignored)")

// Walking all pages by offset must cover every action exactly once and terminate.
// The loop bound also guards against the pre-fix infinite re-read of page one.
seen := map[string]struct{}{}
for offset := 0; offset < total*2; offset += pageSize {
batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{
Filter: NewIsRootActionFilter(),
Limit: pageSize,
Offset: offset,
SortParameters: sortByName,
})
require.NoError(t, err)
page := batch
if len(page) > pageSize { // trim the Limit+1 has-more probe row
page = page[:pageSize]
}
for _, a := range page {
seen[a.RunName] = struct{}{}
}
if len(batch) <= pageSize {
break
}
Comment thread
pingsutw marked this conversation as resolved.
}
assert.Len(t, seen, total, "offset paging must cover all actions exactly once")
}

// TestListActionsOffsetPaginationTiedCreatedAt is a regression test for the
// children_phase_counts undercount on large map tasks. WatchActions pages the
// snapshot by Offset sorted by created_at, but a map task bulk-creates thousands
// of children with identical created_at. OFFSET over a non-unique ORDER BY has
// undefined order among ties, so pages overlap/skip and the snapshot loses rows
// (count came up short, e.g. 4.8K instead of 20000). The fix adds a deterministic
// "name" tiebreaker. Here we force tied created_at and assert offset paging by
// (created_at, run_name) stays totally ordered and covers every row exactly once.
func TestListActionsOffsetPaginationTiedCreatedAt(t *testing.T) {
db := setupActionDB(t)
defer func() { db.Exec("DELETE FROM actions") }()
actionRepo, err := NewActionRepo(db, testDbConfig)
require.NoError(t, err)
ctx := context.Background()

const total = 150
for i := 0; i < total; i++ {
_, err := actionRepo.CreateAction(ctx, &models.Run{
Project: "proj1",
Domain: "domain1",
RunName: fmt.Sprintf("r%03d", i),
Name: rootActionName,
Phase: int32(common.ActionPhase_ACTION_PHASE_QUEUED),
}, false)
require.NoError(t, err)
}

// Force every action to share one created_at, reproducing a bulk-created map task.
_, err = db.Exec("UPDATE actions SET created_at = '2024-01-01T00:00:00Z'")
require.NoError(t, err)

const pageSize = 50
sort := []interfaces.SortParameter{
NewSortParameter("created_at", interfaces.SortOrderAscending),
NewSortParameter("run_name", interfaces.SortOrderAscending),
}

seen := map[string]struct{}{}
var ordered []string
for offset := 0; offset < total*2; offset += pageSize {
batch, err := actionRepo.ListActions(ctx, interfaces.ListResourceInput{
Filter: NewIsRootActionFilter(),
Limit: pageSize,
Offset: offset,
SortParameters: sort,
})
require.NoError(t, err)
page := batch
if len(page) > pageSize { // trim the Limit+1 has-more probe row
page = page[:pageSize]
}
for _, a := range page {
seen[a.RunName] = struct{}{}
ordered = append(ordered, a.RunName)
}
if len(batch) <= pageSize {
break
}
}

assert.Len(t, seen, total, "offset paging over tied created_at must cover all actions exactly once")
assert.True(t, sort1.IsSorted(sort1.StringSlice(ordered)),
"the name tiebreaker must give a total order so pages don't overlap/skip")
Comment on lines +521 to +523
}

func setupActionEventDB(t *testing.T) (*sqlx.DB, *actionRepo) {
db := setupActionDB(t)
r, err := NewActionRepo(db, testDbConfig)
Expand Down
5 changes: 5 additions & 0 deletions runs/service/run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,12 +1313,17 @@ func (s *RunService) listAndSendAllActions(
// Sort ascending by created_at so parent actions are inserted into the
// run state manager before their children. insertAction requires the
// parent node to already exist in the tree when a child is processed.
// "name" is a deterministic tiebreaker: a map task bulk-creates thousands
// of children with identical created_at, and OFFSET paging over a
// non-unique ORDER BY skips/duplicates rows, so the snapshot loses actions
// and ChildPhaseCounts comes up short. name is unique within a run.
batch, err := s.repo.ActionRepo().ListActions(ctx, interfaces.ListResourceInput{
Filter: impl.NewRunActionsFilter(runID),
Limit: pageSize,
Offset: offset,
SortParameters: []interfaces.SortParameter{
impl.NewSortParameter("created_at", interfaces.SortOrderAscending),
impl.NewSortParameter("name", interfaces.SortOrderAscending),
},
Comment on lines 1324 to 1327
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion runs/service/run_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,9 @@ func TestListAndSendAllActionsUsesAscendingSort(t *testing.T) {
err = svc.listAndSendAllActions(context.Background(), runID, rsm, nil)
require.NoError(t, err)

require.Len(t, captured.SortParameters, 1)
require.Len(t, captured.SortParameters, 2)
assert.Equal(t, "created_at ASC", captured.SortParameters[0].GetOrderExpr())
assert.Equal(t, "name ASC", captured.SortParameters[1].GetOrderExpr())
}

func TestGenerateRunName(t *testing.T) {
Expand Down
Loading