From 0ecdc36e712a9febaeceaee254fb467a91944744 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 15:18:35 -0400 Subject: [PATCH 1/4] fix: dedupe task ids before assign --- pkg/repository/scheduler_queue.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/repository/scheduler_queue.go b/pkg/repository/scheduler_queue.go index f03f3ee1c6..ed3f7c3696 100644 --- a/pkg/repository/scheduler_queue.go +++ b/pkg/repository/scheduler_queue.go @@ -286,6 +286,7 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId taskIds := make([]int64, 0, len(r.Assigned)) taskInsertedAts := make([]pgtype.Timestamptz, 0, len(r.Assigned)) workerIds := make([]uuid.UUID, 0, len(r.Assigned)) + seenTaskIds := make(map[int64]struct{}) var minTaskInsertedAt pgtype.Timestamptz @@ -293,6 +294,12 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId // deleted from the v1_queue_items table, so we should not assign them for id, assignedItem := range queueItemIdsToAssignedItem { if _, ok := queuedItemsMap[id]; ok { + if _, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { + continue + } + + seenTaskIds[assignedItem.QueueItem.TaskID] = struct{}{} + taskIds = append(taskIds, assignedItem.QueueItem.TaskID) taskInsertedAts = append(taskInsertedAts, assignedItem.QueueItem.TaskInsertedAt) workerIds = append(workerIds, assignedItem.WorkerId) From a98771af3293e9195b625fb9bc0e405db0ee3bc2 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 16:07:16 -0400 Subject: [PATCH 2/4] fix: mutate taskIdToAssignedItem map to make sure we keep the correct assigned item --- pkg/repository/scheduler_queue.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/repository/scheduler_queue.go b/pkg/repository/scheduler_queue.go index ed3f7c3696..988395fe21 100644 --- a/pkg/repository/scheduler_queue.go +++ b/pkg/repository/scheduler_queue.go @@ -290,8 +290,6 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId var minTaskInsertedAt pgtype.Timestamptz - // if there are any idsToUnqueue that are not in the queuedItems, this means they were - // deleted from the v1_queue_items table, so we should not assign them for id, assignedItem := range queueItemIdsToAssignedItem { if _, ok := queuedItemsMap[id]; ok { if _, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { @@ -299,6 +297,7 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId } seenTaskIds[assignedItem.QueueItem.TaskID] = struct{}{} + taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem taskIds = append(taskIds, assignedItem.QueueItem.TaskID) taskInsertedAts = append(taskInsertedAts, assignedItem.QueueItem.TaskInsertedAt) From 23bf9efd1456dfc576ce58635a71526fbd161663 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 5 May 2026 13:13:32 -0400 Subject: [PATCH 3/4] fix: diff --- pkg/repository/scheduler_queue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/repository/scheduler_queue.go b/pkg/repository/scheduler_queue.go index 988395fe21..d58bdfe7ad 100644 --- a/pkg/repository/scheduler_queue.go +++ b/pkg/repository/scheduler_queue.go @@ -290,6 +290,8 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId var minTaskInsertedAt pgtype.Timestamptz + // if there are any idsToUnqueue that are not in the queuedItems, this means they were + // deleted from the v1_queue_items table, so we should not assign them for id, assignedItem := range queueItemIdsToAssignedItem { if _, ok := queuedItemsMap[id]; ok { if _, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { From 87aa340dfa99f4ff055d8771a638df7125665aba Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 5 May 2026 15:41:26 -0400 Subject: [PATCH 4/4] fix: add some logs --- pkg/repository/scheduler_queue.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/repository/scheduler_queue.go b/pkg/repository/scheduler_queue.go index d58bdfe7ad..0b1a9cd7f5 100644 --- a/pkg/repository/scheduler_queue.go +++ b/pkg/repository/scheduler_queue.go @@ -286,7 +286,7 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId taskIds := make([]int64, 0, len(r.Assigned)) taskInsertedAts := make([]pgtype.Timestamptz, 0, len(r.Assigned)) workerIds := make([]uuid.UUID, 0, len(r.Assigned)) - seenTaskIds := make(map[int64]struct{}) + seenTaskIds := make(map[int64]*sqlcv1.V1QueueItem) var minTaskInsertedAt pgtype.Timestamptz @@ -294,12 +294,17 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId // deleted from the v1_queue_items table, so we should not assign them for id, assignedItem := range queueItemIdsToAssignedItem { if _, ok := queuedItemsMap[id]; ok { - if _, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { - continue + if qi, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { + d.l.Error(). + Int64("task_id", assignedItem.QueueItem.TaskID). + Str("task_inserted_at", qi.TaskInsertedAt.Time.String()). + Int64("retry_count", int64(qi.RetryCount)). + Msg("duplicate task id seen when preparing queue items for `UpdateTasksToAssigned`") } - seenTaskIds[assignedItem.QueueItem.TaskID] = struct{}{} - taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem + seenTaskIds[assignedItem.QueueItem.TaskID] = assignedItem.QueueItem + // todo: add this back if we remove the error log above for deduping + // taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem taskIds = append(taskIds, assignedItem.QueueItem.TaskID) taskInsertedAts = append(taskInsertedAts, assignedItem.QueueItem.TaskInsertedAt)