Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions pkg/repository/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2948,14 +2948,14 @@ type OLAPCutoverBatchOutcome struct {
NextPagination OLAPPaginationParams
}

func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) {
func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error) {
if candidateBatchNumRows <= 0 {
// trivial case that we'll never hit, but to prevent infinite recursion
zero := int32(0)
return &zero, nil
}

proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, p.pool, sqlcv1.ComputeOLAPPayloadBatchSizeParams{
proposedBatchSizeBytes, err := p.queries.ComputeOLAPPayloadBatchSize(ctx, tx, sqlcv1.ComputeOLAPPayloadBatchSizeParams{
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pagination.LastTenantId,
Lastinsertedat: pagination.LastInsertedAt,
Expand All @@ -2975,6 +2975,7 @@ func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context,
// cut it in half and try again
return p.OptimizeOLAPPayloadWindowSize(
ctx,
tx,
partitionDate,
candidateBatchNumRows/2,
pagination,
Expand All @@ -2985,10 +2986,21 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch")
defer span.End()

// note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to
// both run a bunch of queries and also write to S3
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
Comment thread
mrkaye97 marked this conversation as resolved.
}

defer rollback()

tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String())

windowSizePtr, err := p.OptimizeOLAPPayloadWindowSize(
ctx,
tx,
Comment thread
mrkaye97 marked this conversation as resolved.
Outdated
partitionDate,
externalCutoverBatchSize*externalCutoverNumConcurrentOffloads,
pagination,
Expand All @@ -3000,7 +3012,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,

windowSize := *windowSizePtr

payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, p.pool, sqlcv1.CreateOLAPPayloadRangeChunksParams{
payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, tx, sqlcv1.CreateOLAPPayloadRangeChunksParams{
Chunksize: externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Windowsize: windowSize,
Expand Down Expand Up @@ -3032,7 +3044,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
for _, payloadRange := range payloadRanges {
pr := payloadRange
eg.Go(func() error {
payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{
payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{
Partitiondate: pgtype.Date(partitionDate),
Comment thread
mrkaye97 marked this conversation as resolved.
Lasttenantid: pr.LowerTenantID,
Lastexternalid: pr.LowerExternalID,
Expand Down Expand Up @@ -3106,14 +3118,6 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
})
}

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
}

defer rollback()

inserted, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
Expand Down
28 changes: 16 additions & 12 deletions pkg/repository/payloadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,14 @@ func (d PartitionDate) String() string {
const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks
const MAX_BATCH_SIZE_BYTES = 1.5 * 1024 * 1024 * 1024 // 1.5 GB

func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) {
func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Context, tx sqlcv1.DBTX, partitionDate PartitionDate, candidateBatchNumRows int32, pagination PaginationParams) (*int32, error) {
if candidateBatchNumRows <= 0 {
// trivial case that we'll never hit, but to prevent infinite recursion
zero := int32(0)
return &zero, nil
}

proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, p.pool, sqlcv1.ComputePayloadBatchSizeParams{
proposedBatchSizeBytes, err := p.queries.ComputePayloadBatchSize(ctx, tx, sqlcv1.ComputePayloadBatchSizeParams{
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pagination.LastTenantID,
Lastinsertedat: pagination.LastInsertedAt,
Expand All @@ -493,6 +493,7 @@ func (p *payloadStoreRepositoryImpl) OptimizePayloadWindowSize(ctx context.Conte
// cut it in half and try again
return p.OptimizePayloadWindowSize(
ctx,
tx,
partitionDate,
candidateBatchNumRows/2,
pagination,
Expand All @@ -503,9 +504,20 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
ctx, span := telemetry.NewSpan(ctx, "PayloadStoreRepository.ProcessPayloadCutoverBatch")
defer span.End()

// note: this tx will likely be pretty long-running, maybe 3-5s on average, ish, since it needs to
// both run a bunch of queries and also write to S3
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
Comment thread
mrkaye97 marked this conversation as resolved.
}

defer rollback()
Comment thread
mrkaye97 marked this conversation as resolved.
Outdated

tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String())
windowSizePtr, err := p.OptimizePayloadWindowSize(
ctx,
tx,
partitionDate,
p.externalCutoverBatchSize*p.externalCutoverNumConcurrentOffloads,
pagination,
Expand All @@ -517,7 +529,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont

windowSize := *windowSizePtr

payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, p.pool, sqlcv1.CreatePayloadRangeChunksParams{
payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, tx, sqlcv1.CreatePayloadRangeChunksParams{
Chunksize: p.externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Windowsize: windowSize,
Expand Down Expand Up @@ -550,7 +562,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
for _, payloadRange := range payloadRanges {
pr := payloadRange
eg.Go(func() error {
payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedPayloadsForOffloadParams{
payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{
Comment thread
mrkaye97 marked this conversation as resolved.
Outdated
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pr.LowerTenantID,
Lastinsertedat: pr.LowerInsertedAt,
Expand Down Expand Up @@ -638,14 +650,6 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
})
}

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l)

if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
}

defer rollback()

inserted, err := sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
Expand Down
Loading