Skip to content

Commit 10ca10a

Browse files
authored
Fix: Payload external id dupe (#3824)
* feat: add hack column for result payload id to durable event log * fix: use new result payload external id * fix: add backfill to migration to not block * fix: gte * fix: smaller batches * fix: don't need gte
1 parent 683d0b8 commit 10ca10a

6 files changed

Lines changed: 171 additions & 103 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
ALTER TABLE v1_durable_event_log_entry ADD COLUMN result_payload_external_id UUID;
4+
-- +goose StatementEnd
5+
6+
-- +goose StatementBegin
7+
DO $$
8+
DECLARE
9+
batch_size INT := 100; -- small batches, since each durable task can have many event log entries
10+
rows_updated INT;
11+
last_task_id BIGINT := -1;
12+
last_inserted_at TIMESTAMPTZ := '1970-01-01T00:00:00Z';
13+
BEGIN
14+
LOOP
15+
WITH batch_keys AS (
16+
SELECT DISTINCT durable_task_id, durable_task_inserted_at
17+
FROM v1_durable_event_log_entry
18+
WHERE result_payload_external_id IS NULL
19+
AND (durable_task_id, durable_task_inserted_at) > (last_task_id, last_inserted_at)
20+
ORDER BY durable_task_id, durable_task_inserted_at
21+
LIMIT batch_size
22+
), updated AS (
23+
UPDATE v1_durable_event_log_entry e
24+
SET result_payload_external_id = gen_random_uuid()
25+
FROM batch_keys b
26+
WHERE (e.durable_task_id, e.durable_task_inserted_at) = (b.durable_task_id, b.durable_task_inserted_at)
27+
RETURNING e.durable_task_id, e.durable_task_inserted_at
28+
)
29+
30+
SELECT
31+
count(*) OVER (),
32+
durable_task_id,
33+
durable_task_inserted_at
34+
INTO rows_updated, last_task_id, last_inserted_at
35+
FROM updated
36+
ORDER BY durable_task_id DESC, durable_task_inserted_at DESC
37+
LIMIT 1;
38+
39+
EXIT WHEN rows_updated IS NULL OR rows_updated = 0;
40+
END LOOP;
41+
END;
42+
$$;
43+
-- +goose StatementEnd
44+
45+
-- +goose StatementBegin
46+
ALTER TABLE v1_durable_event_log_entry ALTER COLUMN result_payload_external_id SET DEFAULT gen_random_uuid();
47+
ALTER TABLE v1_durable_event_log_entry ALTER COLUMN result_payload_external_id SET NOT NULL;
48+
-- +goose StatementEnd
49+
50+
-- +goose Down
51+
-- +goose StatementBegin
52+
ALTER TABLE v1_durable_event_log_entry DROP COLUMN result_payload_external_id;
53+
-- +goose StatementEnd

pkg/repository/durable_events.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ func (r *durableEventsRepository) getOrCreateEventLogEntries(
824824
storePayloadOpts = append(storePayloadOpts, StorePayloadOpts{
825825
Id: createdRow.ID,
826826
InsertedAt: createdRow.InsertedAt,
827-
ExternalId: createdRow.ExternalID,
827+
ExternalId: createdRow.ResultPayloadExternalID,
828828
Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGENTRYRESULTDATA,
829829
Payload: opt.ResultPayload,
830830
TenantId: opts.TenantId,
@@ -1514,7 +1514,7 @@ func (r *durableEventsRepository) CompleteMemoEntry(ctx context.Context, opts Co
15141514
err = r.payloadStore.Store(ctx, r.pool, StorePayloadOpts{
15151515
Id: entry.ID,
15161516
InsertedAt: entry.InsertedAt,
1517-
ExternalId: entry.ExternalID,
1517+
ExternalId: entry.ResultPayloadExternalID,
15181518
Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGENTRYRESULTDATA,
15191519
Payload: opts.Payload,
15201520
TenantId: opts.TenantId,

pkg/repository/match.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ func (m *sharedRepository) processEventMatches(ctx context.Context, tx sqlcv1.DB
840840
InsertedAt: cb.InsertedAt,
841841
Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGENTRYRESULTDATA,
842842
Payload: initialEntry.Data,
843-
ExternalId: cb.ExternalID,
843+
ExternalId: cb.ResultPayloadExternalID,
844844
TenantId: tenantId,
845845
})
846846
}

0 commit comments

Comments
 (0)