Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/repository/scheduler_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,21 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId
taskIds := make([]int64, 0, len(r.Assigned))
taskInsertedAts := make([]pgtype.Timestamptz, 0, len(r.Assigned))
workerIds := make([]uuid.UUID, 0, len(r.Assigned))
seenTaskIds := make(map[int64]struct{})

var minTaskInsertedAt pgtype.Timestamptz

// if there are any idsToUnqueue that are not in the queuedItems, this means they were
// deleted from the v1_queue_items table, so we should not assign them
for id, assignedItem := range queueItemIdsToAssignedItem {
if _, ok := queuedItemsMap[id]; ok {
if _, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen {
continue
}

seenTaskIds[assignedItem.QueueItem.TaskID] = struct{}{}
taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem

taskIds = append(taskIds, assignedItem.QueueItem.TaskID)
taskInsertedAts = append(taskInsertedAts, assignedItem.QueueItem.TaskInsertedAt)
workerIds = append(workerIds, assignedItem.WorkerId)
Comment thread
mrkaye97 marked this conversation as resolved.
Expand Down
Loading