From e3f66964372bd27314090ce1a31cf55bfca58d8f Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 12:27:58 -0400 Subject: [PATCH 1/4] fix: try a long tx for the individual offload --- pkg/repository/payloadstore.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/pkg/repository/payloadstore.go b/pkg/repository/payloadstore.go index 32f2edc231..93af12b5cd 100644 --- a/pkg/repository/payloadstore.go +++ b/pkg/repository/payloadstore.go @@ -465,14 +465,14 @@ func (d PartitionDate) String() string { const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks const MAX_BATCH_SIZE_BYTES = 1.5 * 1024 * 1024 * 1024 // 1.5 GB -func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) { +func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) { if candidateBatchNumRows <= 0 { // trivial case that we'll never hit, but to prevent infinite recursion zero := int32(0) return &zero, nil } - proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, p.pool, sqlcv1.ComputePayloadBatchSizeParams{ + proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, tx, sqlcv1.ComputePayloadBatchSizeParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pagination.LastTenantID, Lastinsertedat: pagination.LastInsertedAt, @@ -493,6 +493,7 @@ func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Conte // cut it in half and try again return p.OptimizePayloadWindowSize( ctx, + tx, partitionDate, candidateBatchNumRows/2, pagination, @@ -503,9 +504,20 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont ctx, span := telemetry.NewSpan(ctx, "PayloadStoreRepository.ProcessPayloadCutoverBatch") defer span.End() + // note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to + // both run a bunch of queries and also write to S3 + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String()) windowSizePtr, err := p.OptimizePayloadWindowSize( ctx, + tx, partitionDate, p.externalCutoverBatchSize*p.externalCutoverNumConcurrentOffloads, pagination, @@ -517,7 +529,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont windowSize := *windowSizePtr - payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, p.pool, sqlcv1.CreatePayloadRangeChunksParams{ + payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, tx, sqlcv1.CreatePayloadRangeChunksParams{ Chunksize: p.externalCutoverBatchSize, Partitiondate: pgtype.Date(partitionDate), Windowsize: windowSize, @@ -550,7 +562,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont for _, payloadRange := range payloadRanges { pr := payloadRange eg.Go(func() error { - payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedPayloadsForOffloadParams{ + payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pr.LowerTenantID, Lastinsertedat: pr.LowerInsertedAt, @@ -638,14 +650,6 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont }) } - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) - - if err != nil { - return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) - } - - defer rollback() - inserted, err := sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { From 96aaa7aed95b22e19ab04f2bc51a17f1a73bdb50 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 12:29:30 -0400 Subject: [PATCH 2/4] fix: same for olap --- pkg/repository/olap.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/pkg/repository/olap.go b/pkg/repository/olap.go index ce95727227..73c7d46a48 100644 --- a/pkg/repository/olap.go +++ b/pkg/repository/olap.go @@ -2948,14 +2948,14 @@ type OLAPCutoverBatchOutcome struct { NextPagination OLAPPaginationParams } -func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) { +func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) { if candidateBatchNumRows <= 0 { // trivial case that we'll never hit, but to prevent infinite recursion zero := int32(0) return &zero, nil } - proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, p.pool, sqlcv1.ComputeOLAPPayloadBatchSizeParams{ + proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, tx, sqlcv1.ComputeOLAPPayloadBatchSizeParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pagination.LastTenantId, Lastinsertedat: pagination.LastInsertedAt, @@ -2975,6 +2975,7 @@ func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, // cut it in half and try again return p.OptimizeOLAPPayloadWindowSize( ctx, + tx, partitionDate, candidateBatchNumRows/2, pagination, @@ -2985,10 +2986,21 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch") defer span.End() + // note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to + // both run a bunch of queries and also write to S3 + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String()) windowSizePtr, err := p.OptimizeOLAPPayloadWindowSize( ctx, + tx, partitionDate, externalCutoverBatchSize*externalCutoverNumConcurrentOffloads, pagination, @@ -3000,7 +3012,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, windowSize := *windowSizePtr - payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, p.pool, sqlcv1.CreateOLAPPayloadRangeChunksParams{ + payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, tx, sqlcv1.CreateOLAPPayloadRangeChunksParams{ Chunksize: externalCutoverBatchSize, Partitiondate: pgtype.Date(partitionDate), Windowsize: windowSize, @@ -3032,7 +3044,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, for _, payloadRange := range payloadRanges { pr := payloadRange eg.Go(func() error { - payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ + payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pr.LowerTenantID, Lastexternalid: pr.LowerExternalID, @@ -3106,14 +3118,6 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, }) } - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) - - if err != nil { - return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) - } - - defer rollback() - inserted, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { From 2fb81bd5d7d02b2be262ffec5ba7e58e5d70260a Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 12:46:31 -0400 Subject: [PATCH 3/4] fix: no tx-in-goroutine --- pkg/repository/olap.go | 16 +++++++++++++--- pkg/repository/payloadstore.go | 16 +++++++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/repository/olap.go b/pkg/repository/olap.go index 73c7d46a48..6e2af5f902 100644 --- a/pkg/repository/olap.go +++ b/pkg/repository/olap.go @@ -2986,8 +2986,6 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch") defer span.End() - // note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to - // both run a bunch of queries and also write to S3 tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) if err != nil { @@ -3032,6 +3030,10 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, }, nil } + if err = commit(ctx); err != nil { + return nil, fmt.Errorf("failed to commit transaction for creating payload range chunks: %w", err) + } + mu := sync.Mutex{} eg := errgroup.Group{} @@ -3044,7 +3046,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, for _, payloadRange := range payloadRanges { pr := payloadRange eg.Go(func() error { - payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ + payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pr.LowerTenantID, Lastexternalid: pr.LowerExternalID, @@ -3118,6 +3120,14 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, }) } + tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + inserted, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { diff --git a/pkg/repository/payloadstore.go b/pkg/repository/payloadstore.go index 93af12b5cd..61fe06a233 100644 --- a/pkg/repository/payloadstore.go +++ b/pkg/repository/payloadstore.go @@ -504,8 +504,6 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont ctx, span := telemetry.NewSpan(ctx, "PayloadStoreRepository.ProcessPayloadCutoverBatch") defer span.End() - // note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to - // both run a bunch of queries and also write to S3 tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l) if err != nil { @@ -550,6 +548,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont }, nil } + if err = commit(ctx); err != nil { + return nil, fmt.Errorf("failed to commit payload range chunks transaction: %w", err) + } + mu := sync.Mutex{} eg := errgroup.Group{} @@ -562,7 +564,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont for _, payloadRange := range payloadRanges { pr := payloadRange eg.Go(func() error { - payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{ + payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), Lasttenantid: pr.LowerTenantID, Lastinsertedat: pr.LowerInsertedAt, @@ -650,6 +652,14 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont }) } + tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + inserted, err := sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { From 0f675597ddc1341d57d9032d972ed45153539a2d Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 4 May 2026 13:03:39 -0400 Subject: [PATCH 4/4] fix: copy paste --- pkg/repository/olap.go | 2 +- pkg/repository/payloadstore.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/repository/olap.go b/pkg/repository/olap.go index 6e2af5f902..9e7fab9db8 100644 --- a/pkg/repository/olap.go +++ b/pkg/repository/olap.go @@ -3123,7 +3123,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l) if err != nil { - return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + return nil, fmt.Errorf("failed to prepare transaction for inserting cutover payloads: %w", err) } defer rollback() diff --git a/pkg/repository/payloadstore.go b/pkg/repository/payloadstore.go index 61fe06a233..940fb87c2c 100644 --- a/pkg/repository/payloadstore.go +++ b/pkg/repository/payloadstore.go @@ -655,7 +655,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l) if err != nil { - return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + return nil, fmt.Errorf("failed to prepare transaction for inserting cutover payloads: %w", err) } defer rollback()