Skip to content

Commit bb8419e

Browse files
mrkaye97Copilot
andauthored
Feat: Direct task + dag status updates (#3554)
* refactor: improve operation pool / tenant for controller logic * refactor: remove some more loops * feat: initial work on mq-based status updates * feat: initial update query impl (will need to revise) * fix: couple bugs * fix: don't need to write into the temp table anymore * refactor: attempt to do status updates inline * fix: try to handle deadlocking * chore: rm dead code * feat: dag status updates directly * fix: query changes, remove unneeded stuff * fix: migration first pass * fix: move migration * chore: rm old idx from schema * feat: remove status partitioning function + update call sites * fix: add analyze * fix: oops don't actually need analyze * fix: timestamps, add link to note * fix: had locking backwards all along 🤦 * fix: rm target partition check, since I don't think we need this anymore * fix: improve / simplify locking more * fix: is_dag_task * fix: add requeueing logic, comment out some stuff for dev * refactor: simplify more * fix: copy paste bug * fix: simplify more * fix: compiler * chore: gen * fix: more copy paste * fix: yet more copy paste * fix: add where clause * chore: remove debug prints * refactor: simplify the dag status update query a bunch to be more in line with the task one * chore: rm old comment * chore: gen * fix: merge issues * chore: rm migrations, these are completed now * fix: cleanup * fix: return everything * fix: simplify dag updates, use inner join, remove cte * fix: initial work on reqeueing when dags aren't found * fix: add some comments * fix: rm struct we don't need * refactor: initial work removing requeue logic * fix: simplify more * chore: add todo comments * fix: move copyfrom to an overwrite for task writes * fix: cast trick * fix: wrong type * fix: another type * refactor: first pass remaining work * feat: first pass at reconciliation * chore: rm print lines * refactor: rewrite consolidation query, add explicit locking * chore: add comment * fix: col ref * fix: another col name * fix: another * fix: move where clause up, add ordering to avoid deadlocking * fix: more aliases ugh * refacto: use advisory locks, remove reconciliation logic * fix: add back reconciliation logic * fix: add notify back on update * fix: update logic * chore: gen * chore: uncomment * fix: bit of sql cleanup * fix: gen * fix: couple pr comments, I swear I had done this already.... * fix: copilot ordering issue Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * chore: gen * fix: notify dag status updates * fix: only send message for terminal statuses * Revert "fix: only send message for terminal statuses" This reverts commit 6a7737a. * Revert "fix: notify dag status updates" This reverts commit bafde7c. --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 51b512e commit bb8419e

20 files changed

Lines changed: 1235 additions & 367 deletions

internal/operation/pool.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/hatchet-dev/hatchet/internal/services/partition"
1010
"github.com/hatchet-dev/hatchet/internal/syncx"
1111
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
12-
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
1312

1413
"github.com/rs/zerolog"
1514
)
@@ -109,11 +108,11 @@ func (p *TenantOperationPool) Cleanup() {
109108
})
110109
}
111110

112-
func (p *TenantOperationPool) setTenants(tenants []*sqlcv1.Tenant) {
111+
func (p *TenantOperationPool) setTenants(tenants []uuid.UUID) {
113112
tenantMap := make(map[uuid.UUID]bool)
114113

115114
for _, t := range tenants {
116-
tenantMap[t.ID] = true
115+
tenantMap[t] = true
117116
}
118117

119118
// init ops for new tenants

internal/queueutils/pool.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package queueutils
33
import (
44
"time"
55

6+
"github.com/google/uuid"
67
"github.com/hatchet-dev/hatchet/internal/syncx"
7-
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
88

99
"github.com/rs/zerolog"
1010
)
@@ -33,11 +33,11 @@ func (p *OperationPool) WithJitter(maxJitter time.Duration) *OperationPool {
3333
return p
3434
}
3535

36-
func (p *OperationPool) SetTenants(tenants []*sqlcv1.Tenant) {
36+
func (p *OperationPool) SetTenants(tenants []uuid.UUID) {
3737
tenantMap := make(map[string]bool)
3838

3939
for _, t := range tenants {
40-
tenantMap[t.ID.String()] = true
40+
tenantMap[t.String()] = true
4141
}
4242

4343
// delete tenants that are not in the list

internal/services/controllers/olap/controller.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,12 @@ func (tc *OLAPControllerImpl) handleCreatedTask(ctx context.Context, tenantId uu
511511
createTaskOpts = append(createTaskOpts, msg.V1TaskWithPayload)
512512
}
513513

514-
if err := tc.repo.OLAP().CreateTasks(ctx, tenantId, createTaskOpts); err != nil {
514+
result, err := tc.repo.OLAP().CreateTasks(ctx, tenantId, createTaskOpts)
515+
if err != nil {
516+
return err
517+
}
518+
519+
if err := tc.notifyStatusUpdates(ctx, result); err != nil {
515520
return err
516521
}
517522

@@ -766,6 +771,7 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
766771
durableInvocationCounts := make([]int32, 0)
767772
workerIds := make([]uuid.UUID, 0)
768773
workflowIds := make([]uuid.UUID, 0)
774+
workflowRunIDs := make([]uuid.UUID, 0)
769775
eventTypes := make([]sqlcv1.V1EventTypeOlap, 0)
770776
readableStatuses := make([]sqlcv1.V1ReadableStatusOlap, 0)
771777
eventPayloads := make([]string, 0)
@@ -790,6 +796,7 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
790796
taskIds = append(taskIds, msg.TaskId)
791797
taskInsertedAts = append(taskInsertedAts, taskMeta.InsertedAt)
792798
workflowIds = append(workflowIds, taskMeta.WorkflowID)
799+
workflowRunIDs = append(workflowRunIDs, taskMeta.WorkflowRunID)
793800
retryCounts = append(retryCounts, msg.RetryCount)
794801
durableInvocationCounts = append(durableInvocationCounts, msg.DurableInvocationCount)
795802
eventTypes = append(eventTypes, msg.EventType)
@@ -913,12 +920,16 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
913920
opts = append(opts, event)
914921
}
915922

916-
err = tc.repo.OLAP().CreateTaskEvents(ctx, tenantId, opts)
923+
result, err := tc.repo.OLAP().CreateTaskEvents(ctx, tenantId, opts, workflowRunIDs)
917924

918925
if err != nil {
919926
return err
920927
}
921928

929+
if err := tc.notifyStatusUpdates(ctx, result); err != nil {
930+
return err
931+
}
932+
922933
tc.synthesizeEngineSpans(ctx, tenantId, spanEvents)
923934

924935
if !tc.repo.OLAP().PayloadStore().ExternalStoreEnabled() {

internal/services/controllers/olap/process_alerts.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ func (o *OLAPControllerImpl) runTenantProcessAlerts(ctx context.Context) func()
2626

2727
o.processTenantAlertOperations.SetTenants(tenants)
2828

29-
for i := range tenants {
30-
tenantId := tenants[i].ID.String()
31-
32-
o.processTenantAlertOperations.RunOrContinue(tenantId)
29+
for _, tenantId := range tenants {
30+
o.processTenantAlertOperations.RunOrContinue(tenantId.String())
3331
}
3432
}
3533
}

internal/services/controllers/olap/process_dag_status_updates.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,13 @@ func (o *OLAPControllerImpl) runDAGStatusUpdates(ctx context.Context) func() {
2222
for shouldContinue {
2323
o.l.Debug().Ctx(ctx).Msgf("partition: running status updates for dags")
2424

25-
// list all tenants
26-
tenants, err := o.p.ListTenantsForController(ctx)
25+
tenantIds, err := o.p.ListTenantsForController(ctx)
2726

2827
if err != nil {
2928
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")
3029
return
3130
}
3231

33-
tenantIds := make([]uuid.UUID, 0, len(tenants))
34-
35-
for _, tenant := range tenants {
36-
tenantId := tenant.ID
37-
tenantIds = append(tenantIds, tenantId)
38-
}
39-
4032
var rows []v1.UpdateDAGStatusRow
4133

4234
shouldContinue, rows, err = o.repo.OLAP().UpdateDAGStatuses(ctx, tenantIds)

internal/services/controllers/olap/process_task_status_updates.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,13 @@ func (o *OLAPControllerImpl) runTaskStatusUpdates(ctx context.Context) func() {
2222
for shouldContinue {
2323
o.l.Debug().Ctx(ctx).Msgf("partition: running status updates for tasks")
2424

25-
// list all tenants
26-
tenants, err := o.p.ListTenantsForController(ctx)
25+
tenantIds, err := o.p.ListTenantsForController(ctx)
2726

2827
if err != nil {
2928
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")
3029
return
3130
}
3231

33-
tenantIds := make([]uuid.UUID, 0, len(tenants))
34-
35-
for _, tenant := range tenants {
36-
tenantId := tenant.ID
37-
tenantIds = append(tenantIds, tenantId)
38-
}
39-
4032
var rows []v1.UpdateTaskStatusRow
4133

4234
shouldContinue, rows, err = o.repo.OLAP().UpdateTaskStatuses(ctx, tenantIds)
@@ -123,3 +115,23 @@ func (o *OLAPControllerImpl) notifyTasksUpdated(ctx context.Context, rows []v1.U
123115

124116
return nil
125117
}
118+
119+
func (o *OLAPControllerImpl) notifyStatusUpdates(ctx context.Context, result *v1.StatusUpdateResult) error {
120+
if result == nil {
121+
return nil
122+
}
123+
124+
if len(result.TaskRows) > 0 {
125+
if err := o.notifyTasksUpdated(ctx, result.TaskRows); err != nil {
126+
return err
127+
}
128+
}
129+
130+
if len(result.DAGRows) > 0 {
131+
if err := o.notifyDAGsUpdated(ctx, result.DAGRows); err != nil {
132+
return err
133+
}
134+
}
135+
136+
return nil
137+
}

internal/services/controllers/retention/shared.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
"golang.org/x/sync/errgroup"
1111

12-
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
12+
"github.com/google/uuid"
1313
)
1414

1515
func GetDataRetentionExpiredTime(duration string) (time.Time, error) {
@@ -22,7 +22,7 @@ func GetDataRetentionExpiredTime(duration string) (time.Time, error) {
2222
return time.Now().UTC().Add(-d), nil
2323
}
2424

25-
func (rc *RetentionControllerImpl) ForTenants(ctx context.Context, perTenantTimeout time.Duration, f func(ctx context.Context, tenant sqlcv1.Tenant) error) error {
25+
func (rc *RetentionControllerImpl) ForTenants(ctx context.Context, perTenantTimeout time.Duration, f func(ctx context.Context, tenantId uuid.UUID) error) error {
2626
tenants, err := rc.p.ListTenantsForController(ctx)
2727

2828
if err != nil {
@@ -37,14 +37,14 @@ func (rc *RetentionControllerImpl) ForTenants(ctx context.Context, perTenantTime
3737
errs []error
3838
)
3939

40-
for _, tenant := range tenants {
40+
for _, tenantId := range tenants {
4141
g.Go(func() error {
4242
tenantCtx, cancel := context.WithTimeout(ctx, perTenantTimeout)
4343
defer cancel()
4444

45-
if err := f(tenantCtx, *tenant); err != nil {
45+
if err := f(tenantCtx, tenantId); err != nil {
4646
mu.Lock()
47-
errs = append(errs, fmt.Errorf("tenant %s: %w", tenant.ID.String(), err))
47+
errs = append(errs, fmt.Errorf("tenant %s: %w", tenantId.String(), err))
4848
mu.Unlock()
4949
}
5050
return nil

internal/services/controllers/retention/worker.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
8+
"github.com/google/uuid"
99
"github.com/hatchet-dev/hatchet/pkg/telemetry"
1010
)
1111

@@ -22,10 +22,16 @@ func (rc *RetentionControllerImpl) runCleanupOldWorkers(ctx context.Context) fun
2222
}
2323
}
2424

25-
func (rc *RetentionControllerImpl) cleanupOldWorkersForTenant(ctx context.Context, tenant sqlcv1.Tenant) error {
25+
func (rc *RetentionControllerImpl) cleanupOldWorkersForTenant(ctx context.Context, tenantId uuid.UUID) error {
2626
ctx, span := telemetry.NewSpan(ctx, "cleanup-old-workers-tenant")
2727
defer span.End()
2828

29+
tenant, err := rc.repo.Tenant().GetTenantByID(ctx, tenantId)
30+
31+
if err != nil {
32+
return fmt.Errorf("could not get tenant %s: %w", tenantId.String(), err)
33+
}
34+
2935
cutoff, err := GetDataRetentionExpiredTime(tenant.DataRetentionPeriod)
3036
if err != nil {
3137
return fmt.Errorf("could not get cutoff for tenant %s: %w", tenant.ID.String(), err)

internal/services/partition/partition.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/go-co-op/gocron/v2"
10+
"github.com/google/uuid"
1011
"github.com/rs/zerolog"
1112

1213
"github.com/hatchet-dev/hatchet/pkg/cleanup"
@@ -176,7 +177,7 @@ func (p *Partition) GetInternalTenantForController(ctx context.Context) (*sqlcv1
176177
return p.repo.GetInternalTenantForController(ctx, p.GetControllerPartitionId())
177178
}
178179

179-
func (p *Partition) ListTenantsForController(ctx context.Context) ([]*sqlcv1.Tenant, error) {
180+
func (p *Partition) ListTenantsForController(ctx context.Context) ([]uuid.UUID, error) {
180181
return p.repo.ListTenantsByControllerPartition(ctx, p.GetControllerPartitionId())
181182
}
182183

0 commit comments

Comments
 (0)