Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions pkg/repository/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2948,14 +2948,14 @@ type OLAPCutoverBatchOutcome struct {
NextPagination OLAPPaginationParams
}

func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) {
func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) {
if candidateBatchNumRows <= 0 {
// trivial case that we'll never hit, but to prevent infinite recursion
zero := int32(0)
return &zero, nil
}

proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, p.pool, sqlcv1.ComputeOLAPPayloadBatchSizeParams{
proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, tx, sqlcv1.ComputeOLAPPayloadBatchSizeParams{
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pagination.LastTenantId,
Lastinsertedat: pagination.LastInsertedAt,
Expand All @@ -2975,6 +2975,7 @@ func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context,
// cut it in half and try again
return p.OptimizeOLAPPayloadWindowSize(
ctx,
tx,
partitionDate,
candidateBatchNumRows/2,
pagination,
Expand All @@ -2985,10 +2986,19 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch")
defer span.End()

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
Comment thread
mrkaye97 marked this conversation as resolved.
}

defer rollback()

tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String())

windowSizePtr, err := p.OptimizeOLAPPayloadWindowSize(
ctx,
tx,
partitionDate,
externalCutoverBatchSize*externalCutoverNumConcurrentOffloads,
pagination,
Expand All @@ -3000,7 +3010,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,

windowSize := *windowSizePtr

payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, p.pool, sqlcv1.CreateOLAPPayloadRangeChunksParams{
payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, tx, sqlcv1.CreateOLAPPayloadRangeChunksParams{
Chunksize: externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Windowsize: windowSize,
Expand All @@ -3020,6 +3030,10 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
}, nil
}

if err = commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit transaction for creating payload range chunks: %w", err)
}

mu := sync.Mutex{}
eg := errgroup.Group{}

Expand Down Expand Up @@ -3106,10 +3120,10 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
})
}

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)
tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
return nil, fmt.Errorf("failed to prepare transaction for inserting cutover payloads: %w", err)
}

defer rollback()
Expand Down
24 changes: 19 additions & 5 deletions pkg/repository/payloadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,14 @@ func (d PartitionDate) String() string {
const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks
const MAX_BATCH_SIZE_BYTES = 1.5 * 1024 * 1024 * 1024 // 1.5 GB

func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) {
func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) {
if candidateBatchNumRows <= 0 {
// trivial case that we'll never hit, but to prevent infinite recursion
zero := int32(0)
return &zero, nil
}

proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, p.pool, sqlcv1.ComputePayloadBatchSizeParams{
proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, tx, sqlcv1.ComputePayloadBatchSizeParams{
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pagination.LastTenantID,
Lastinsertedat: pagination.LastInsertedAt,
Expand All @@ -493,6 +493,7 @@ func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Conte
// cut it in half and try again
return p.OptimizePayloadWindowSize(
ctx,
tx,
partitionDate,
candidateBatchNumRows/2,
pagination,
Expand All @@ -503,9 +504,18 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
ctx, span := telemetry.NewSpan(ctx, "PayloadStoreRepository.ProcessPayloadCutoverBatch")
defer span.End()

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
Comment thread
mrkaye97 marked this conversation as resolved.
}

defer rollback()

tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String())
windowSizePtr, err := p.OptimizePayloadWindowSize(
ctx,
tx,
partitionDate,
p.externalCutoverBatchSize*p.externalCutoverNumConcurrentOffloads,
pagination,
Expand All @@ -517,7 +527,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont

windowSize := *windowSizePtr

payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, p.pool, sqlcv1.CreatePayloadRangeChunksParams{
payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, tx, sqlcv1.CreatePayloadRangeChunksParams{
Chunksize: p.externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Windowsize: windowSize,
Expand All @@ -538,6 +548,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
}, nil
}

if err = commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit payload range chunks transaction: %w", err)
}

mu := sync.Mutex{}
eg := errgroup.Group{}

Expand Down Expand Up @@ -638,10 +652,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
})
}

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)
tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
return nil, fmt.Errorf("failed to prepare transaction for inserting cutover payloads: %w", err)
}

defer rollback()
Expand Down
Loading