From 1ef78beb36c8f71e2149a0db2a2691895a866ec2 Mon Sep 17 00:00:00 2001 From: Nikolay Samokhvalov Date: Thu, 16 Apr 2026 19:08:53 +0000 Subject: [PATCH 1/5] Pause recovery on logical slot conflict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new GUC, recovery_pause_on_logical_slot_conflict (PGC_SIGHUP, default off). When enabled, WAL replay on a standby pauses instead of invalidating an active logical replication slot whose catalog_xmin would be overtaken by a Heap2/PRUNE_ON_ACCESS record's snapshotConflictHorizon. An operator can then drain the slot via pg_logical_slot_get_changes and call pg_wal_replay_resume() to continue. On resume, the patch advances the drained slot's catalog_xmin past the conflict horizon so the subsequent InvalidateObsoleteReplicationSlots call becomes a no-op; replay continues to the next conflict and the cycle repeats. This makes logical decoding from an archive-only standby (no streaming replication link to the primary) viable for continuous CDC. Without this GUC, slots on such standbys are invalidated the first time replay applies a catalog vacuum record whose horizon exceeds the slot's catalog_xmin — typically ~2 * autovacuum_naptime after slot creation. Hooks into ResolveRecoveryConflictWithSnapshot(), the single choke point in the replay path for RS_INVAL_HORIZON conflicts, via a new MaybePauseOnLogicalSlotConflict() function. Reuses the existing SetRecoveryPause / recoveryNotPausedCV machinery — no new shared-memory state. Hot path when GUC off is one boolean early-return. Edge cases handled: - Slots still inside DecodingContextFindStartpoint (effective_catalog_xmin not yet valid) are skipped. Pausing for them would deadlock: snapbuild needs WAL to advance, pause holds it back. Invalidating an in-progress slot is harmless — the caller retries. - Pause-check uses TransactionIdPrecedesOrEquals to match the semantics of DetermineSlotInvalidationCause. Without that, a slot whose catalog_xmin was just advanced to horizon+1 by a previous pause cycle would fail to re-pause on a subsequent record with horizon == horizon+1, yet would still be invalidated by the fall-through. - CheckForStandbyTrigger() is called in the wait loop so pg_promote() does not stall while paused. Mirrors the existing recoveryPausesHere escape loop. - Synced slots (data.synced == true, i.e. managed by the slot-sync worker per sync_replication_slots) are skipped in both the pause-check and advance scans. Writing to their fields from the startup process would race with the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT on a synced slot errors out — so the operator-facing "drain or drop" recipe does not apply. ConfirmRecoveryPaused() and CheckForStandbyTrigger() are made extern for use by MaybePauseOnLogicalSlotConflict's wait loop — the pause is entered from inside ResolveRecoveryConflictWithSnapshot rather than the main replay loop, so we need to transition RECOVERY_PAUSE_REQUESTED -> RECOVERY_PAUSED ourselves and consume PROMOTE_SIGNAL_FILE ourselves. Known limitation: the advance marks slots dirty but does not force an immediate SaveSlotToPath. If the standby crashes between resume and the next restartpoint, the advance is lost — on restart replay re-encounters the same conflict record, re-pauses, and the operator re-drains (idempotent). A future iteration could tighten this. --- src/backend/access/transam/xlogrecovery.c | 27 ++- src/backend/storage/ipc/standby.c | 227 ++++++++++++++++++ src/backend/utils/misc/guc_parameters.dat | 8 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/xlogrecovery.h | 3 + src/include/storage/standby.h | 2 + 6 files changed, 265 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index c236e2b796928..0452e0d33d52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -96,6 +96,21 @@ const char *recoveryTargetName; XLogRecPtr recoveryTargetLSN; int recovery_min_apply_delay = 0; +/* + * If true, when WAL replay on a standby is about to invalidate an otherwise- + * active logical replication slot because a catalog PRUNE_ON_ACCESS record's + * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay + * instead and give an operator a chance to drain (or drop) the slot. + * + * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: + * an archive-only logical-decoding standby cannot feed hot_standby_feedback + * to the primary, so it has no natural way to keep the primary's catalog + * horizon pinned. Without this GUC, any logical slot created on such a + * standby is invalidated the first time replay applies a catalog vacuum + * record whose horizon exceeds the slot's catalog_xmin. + */ +bool recovery_pause_on_logical_slot_conflict = false; + /* options formerly taken from recovery.conf for XLOG streaming */ char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; @@ -363,7 +378,7 @@ static bool recoveryStopsAfter(XLogReaderState *record); static char *getRecoveryStopReason(void); static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); -static void ConfirmRecoveryPaused(void); +/* ConfirmRecoveryPaused is extern for use by MaybePauseOnLogicalSlotConflict */ static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, @@ -386,7 +401,7 @@ static int XLogFileRead(XLogSegNo segno, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source); -static bool CheckForStandbyTrigger(void); +/* CheckForStandbyTrigger is extern for MaybePauseOnLogicalSlotConflict */ static void SetPromoteIsTriggered(void); static bool HotStandbyActiveInReplay(void); @@ -3080,7 +3095,7 @@ SetRecoveryPause(bool recoveryPause) * Confirm the recovery pause by setting the recovery pause state to * RECOVERY_PAUSED. */ -static void +void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ @@ -4435,7 +4450,11 @@ SetPromoteIsTriggered(void) /* * Check whether a promote request has arrived. */ -static bool +/* + * Non-static: MaybePauseOnLogicalSlotConflict needs this to break its wait + * loop on promotion, same as recoveryPausesHere does. + */ +bool CheckForStandbyTrigger(void) { if (LocalPromoteIsTriggered) diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 29af773394832..62f743cc7eeb1 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,8 +24,11 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/startup.h" #include "replication/slot.h" +#include "storage/condition_variable.h" #include "storage/bufmgr.h" +#include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" @@ -503,8 +506,232 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * reached, e.g. due to using a physical replication slot. */ if (IsLogicalDecodingEnabled() && isCatalogRel) + { + MaybePauseOnLogicalSlotConflict(locator.dbOid, snapshotConflictHorizon); InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); + } +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait on the recovery + * pause condition variable until an operator resumes. + * + * On resume the caller re-falls through to InvalidateObsoleteReplicationSlots: + * if the operator has drained / dropped / advanced the slot, invalidation is + * a no-op; if they chose to resume without acting, the slot is invalidated + * as usual. This matches the recovery_target_action=pause precedent. + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + int i; + bool would_invalidate = false; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) + return; + + /* + * Scan for a would-be-invalidated slot in the conflicting database. + * + * Skip slots that have not yet reached snapshot-builder consistency + * (effective_catalog_xmin is still InvalidTransactionId). An in-progress + * slot has not produced any output to a consumer, so invalidating it is + * harmless — the caller can retry. Pausing for such a slot would + * deadlock: DecodingContextFindStartpoint would be waiting for replay + * to advance, while replay would be waiting for the slot to be drained. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + Oid slot_db; + TransactionId slot_xmin; + TransactionId slot_effective_xmin; + bool active_logical; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + slot_db = s->data.database; + slot_xmin = s->data.catalog_xmin; + slot_effective_xmin = s->effective_catalog_xmin; + /* + * Skip synced slots (managed by the slot-sync worker per + * sync_replication_slots). Writing their fields from the startup + * process would race with the slot-sync worker's own updates, and + * the operator-facing "drain or drop the slot" recipe in the + * errhint below cannot be applied to a synced slot (ALTER / + * DROP_REPLICATION_SLOT error on synced). + */ + active_logical = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); + SpinLockRelease(&s->mutex); + + if (!active_logical) + continue; + if (slot_db != dboid) + continue; + if (!TransactionIdIsValid(slot_xmin)) + continue; + /* + * Use PrecedesOrEquals (not Precedes) to match the check in + * DetermineSlotInvalidationCause. Otherwise a slot whose + * catalog_xmin was just advanced to exactly conflict_horizon by + * a previous pause-and-advance cycle (our own resume code) will + * NOT trigger a pause here when the next prune record arrives + * with horizon == catalog_xmin, yet WILL still be invalidated + * by the fall-through InvalidateObsoleteReplicationSlots call. + */ + if (TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + { + would_invalidate = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + if (!would_invalidate) + return; + + ereport(LOG, + (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))), + errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", + snapshotConflictHorizon, dboid), + errhint("Drain, advance, or drop the slot, then execute pg_wal_replay_resume()."))); + + SetRecoveryPause(true); + + while (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED) + { + ProcessStartupProcInterrupts(); + + /* + * If the operator gave up on the slot and triggered a promotion + * instead, bail out of the wait so the startup process can proceed + * with the promotion path. Must use CheckForStandbyTrigger (which + * actually consumes PROMOTE_SIGNAL_FILE), not PromoteIsTriggered + * (which only reads a flag populated by the former). Mirrors the + * same escape in recoveryPausesHere(). + */ + if (CheckForStandbyTrigger()) + { + ConditionVariableCancelSleep(); + return; + } + + /* + * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that + * observers (pg_get_wal_replay_pause_state() / monitoring) see the + * pause as actually taken, not just requested. + */ + ConfirmRecoveryPaused(); + ConditionVariableTimedSleep(&XLogRecoveryCtl->recoveryNotPausedCV, + 1000, WAIT_EVENT_RECOVERY_PAUSE); + } + ConditionVariableCancelSleep(); + + /* + * Operator has resumed. If they drained slot(s) up to (or past) the LSN + * of the about-to-be-replayed conflict record, we trust that the consumer + * downstream has captured everything that needed the pre-conflict catalog + * snapshot. Advance those slots' catalog_xmin past the horizon so the + * subsequent InvalidateObsoleteReplicationSlots() fall-through is a + * no-op. Slots that the operator did NOT drain are left alone and get + * invalidated normally — that is the "I didn't act, just let the slot + * die" path. + * + * "Drained past the conflict LSN" is defined as: the slot's + * confirmed_flush_lsn >= the LSN at which replay has paused, which is + * the current replay position reported by GetXLogReplayRecPtr. + */ + { + XLogRecPtr conflict_lsn = GetXLogReplayRecPtr(NULL); + int j; + + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + for (j = 0; j < max_replication_slots; j++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[j]; + bool advance; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + /* + * Skip synced slots — same reason as in the pause-check scan. + * Writing their fields would race the slot-sync worker. + */ + advance = (s->data.invalidated == RS_INVAL_NONE && + s->data.database == dboid && + !s->data.synced && + s->data.confirmed_flush >= conflict_lsn && + ((TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) || + (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)))); + if (advance) + { + TransactionId new_xmin = snapshotConflictHorizon; + + TransactionIdAdvance(new_xmin); /* strictly > horizon */ + if (TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) + { + s->data.catalog_xmin = new_xmin; + s->effective_catalog_xmin = new_xmin; + } + if (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)) + { + s->data.xmin = new_xmin; + s->effective_xmin = new_xmin; + } + s->just_dirtied = true; + s->dirty = true; + } + SpinLockRelease(&s->mutex); + + if (advance) + ereport(LOG, + (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", + NameStr(s->data.name), snapshotConflictHorizon), + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; operator drained before resuming.", + LSN_FORMAT_ARGS(s->data.confirmed_flush), + LSN_FORMAT_ARGS(conflict_lsn)))); + } + LWLockRelease(ReplicationSlotControlLock); + + /* + * NOTE: the advance above only marks the slot dirty; it is persisted + * to disk at the next restartpoint. If the standby crashes between + * here and the next restartpoint, the on-disk catalog_xmin is the + * pre-advance value, and on recovery restart we re-encounter the + * same conflict record, re-pause, and the operator re-drains. No + * data loss — the drain is idempotent for the same slot state — but + * an operator-visible hiccup. A future iteration could tighten this + * by calling SaveSlotToPath directly; SaveSlotToPath is currently + * static in slot.c and not trivially callable from the startup + * process (no MyReplicationSlot). + */ + } } /* diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 83af594d4af4b..165e69ca90f31 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2440,6 +2440,14 @@ max => 'INT_MAX', }, +{ name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', + context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. The operator can then drain or drop the slot and call pg_wal_replay_resume() to continue.', + variable => 'recovery_pause_on_logical_slot_conflict', + boot_val => 'false', +}, + { name => 'recovery_prefetch', type => 'enum', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY', short_desc => 'Prefetch referenced blocks during recovery.', long_desc => 'Look ahead in the WAL to find references to uncached data.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ac38cddaaf9a6..17b2bcc4df8b9 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -403,6 +403,8 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating + # a logical slot on catalog conflict #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index ba7750dca0b45..58d578ce85373 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -129,6 +129,7 @@ extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl; extern PGDLLIMPORT bool recoveryTargetInclusive; extern PGDLLIMPORT int recoveryTargetAction; extern PGDLLIMPORT int recovery_min_apply_delay; +extern PGDLLIMPORT bool recovery_pause_on_logical_slot_conflict; extern PGDLLIMPORT char *PrimaryConnInfo; extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; @@ -213,6 +214,8 @@ extern bool HotStandbyActive(void); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern RecoveryPauseState GetRecoveryPauseState(void); extern void SetRecoveryPause(bool recoveryPause); +extern void ConfirmRecoveryPaused(void); +extern bool CheckForStandbyTrigger(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 8715c08e94f20..510d1d57e1c83 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -75,6 +75,8 @@ extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHo extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator); +extern void MaybePauseOnLogicalSlotConflict(Oid dboid, + TransactionId snapshotConflictHorizon); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); From ffd897ce30864f408c1cfbd002649b21bb83429b Mon Sep 17 00:00:00 2001 From: Nikolay Samokhvalov Date: Thu, 16 Apr 2026 19:09:10 +0000 Subject: [PATCH 2/5] Add TAP test for recovery_pause_on_logical_slot_conflict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 10 assertions, ~30 wallclock seconds. Two-phase flow: Phase 1 sets up an archive-only standby from a clean basebackup + pg_log_standby_snapshot and creates logical slots on TWO standbys while the archive contains no catalog-prune records. One standby has the GUC on, the other off. Phase 2 then runs catalog- churning workload on the primary (transient tables + VACUUM on pg_class, pg_attribute, pg_type, pg_depend, pg_statistic) and waits for those segments to archive. When the standbys replay through those segments, the GUC-on one pauses; a Perl orchestrator drains the slot with pg_logical_slot_get_changes and calls pg_wal_replay_resume. The GUC-off baseline standby lets its slot invalidate — the upstream default behavior, unchanged. A third standby is created after Phase 2 archives (so its replay will pause quickly on first conflict record). The test then calls pg_promote(wait=>true, wait_seconds=>30) on the paused standby and asserts that promote returns true in under 10 seconds. Guards the CheckForStandbyTrigger() escape path — without that, pg_promote stalls for the full wait_seconds and returns false. Assertions: ok 1 - GUC is registered ok 2 - slot created cleanly in Phase 1 (GUC on, state: reserved) ok 3 - baseline slot created cleanly in Phase 1 (GUC off, reserved) ok 4 - slot survived catalog prune with GUC on (reserved) ok 5 - at least one pause event was handled ok 6 - at least 2000 decoded events ok 7 - baseline (GUC off): slot invalidates as expected (lost) ok 8 - promote-test standby reached paused state before promotion ok 9 - pg_promote returned true while standby was paused by GUC ok 10 - pg_promote completed in under 10s --- .../t/050_recovery_pause_on_slot_conflict.pl | 296 ++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl diff --git a/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl new file mode 100644 index 0000000000000..56f6126203980 --- /dev/null +++ b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl @@ -0,0 +1,296 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Exercise the recovery_pause_on_logical_slot_conflict GUC on a standby. +# +# Two-phase flow so the slot is fully consistent BEFORE any catalog- +# prune WAL record is replayed — otherwise slot creation would block +# inside DecodingContextFindStartpoint while replay pauses on the +# prune, and we would deadlock. (Fix #1, bbd5d4e13bc, narrows the +# window but doesn't remove it; keeping the two-phase flow explicit +# makes the test robust.) +# +# Phase 1 — bring up a consistent logical slot on the standby from a +# quiet primary archive: +# * take basebackup +# * pg_log_standby_snapshot() → snapbuild path (a) anchor +# * wait for the snapshot's segment to archive +# * start standby, let replay catch up, create slot (quick — no +# prune records in the archive yet). +# +# Phase 2 — churn the primary's catalog so the standby's replay +# eventually hits a catalog-prune record that would invalidate the +# slot: +# * run CREATE / DROP of transient tables (pg_class churn) +# * run ANALYZE x2 + VACUUM pg_statistic / pg_class (HOT prune on +# catalog relations in db=postgres) +# * wait for those segments to archive +# * orchestrator loop on the standby: when +# pg_get_wal_replay_pause_state() returns paused, drain the slot +# via pg_logical_slot_get_changes, call pg_wal_replay_resume, +# continue. +# +# Assertions: +# 1. The GUC is registered. +# 2. After the orchestrator finishes, the slot's wal_status is +# 'reserved' (NOT 'lost' / 'rows_removed'). +# 3. At least one pause event was handled. +# 4. At least 2000 decoded events came out of the slot. + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 'logical', has_archiving => 1); +$node_primary->append_conf('postgresql.conf', qq[ +wal_level = logical +archive_mode = on +archive_timeout = 1s +autovacuum = on +autovacuum_naptime = 5s +fsync = off +synchronous_commit = off +]); +$node_primary->start; + +# 1. GUC visibility +my $guc = $node_primary->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); +is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + +# Set up a workload table on primary +$node_primary->safe_psql('postgres', qq[ + CREATE TABLE events (id serial PRIMARY KEY, payload text); + ALTER TABLE events REPLICA IDENTITY FULL; + INSERT INTO events (payload) VALUES ('seed'); +]); + +# --- Phase 1: clean archive, slot creation --- # + +my $backup_name = 'backup1'; +$node_primary->backup($backup_name); + +# Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) +# anchor for snapbuild. +$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Force the segment containing that anchor to archive so the standby +# can see it via restore_command. Switch TWICE: first switch closes the +# segment with the snapshot record; second switch gives snapbuild the +# forward WAL it needs to decide the slot is consistent. Without the +# second switch, DecodingContextFindStartpoint blocks on 'waiting for +# WAL to become available at seg N+1' — flaky slot creation. +my $phase1_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); +$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); +$node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase1_seg' + FROM pg_stat_archiver +]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; + +# Bring up BOTH standbys (GUC-on and GUC-off) while the archive still +# contains only the quiet-moment snapshot — no prune records yet. Slot +# creation reaches SNAPBUILD_CONSISTENT quickly on both. Later, when +# Phase 2 ships the prune records, the two standbys diverge: the +# GUC-on one pauses and drains; the GUC-off one invalidates. +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); +$node_standby->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = on +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); +$node_standby->start; + +my $node_standby_off = PostgreSQL::Test::Cluster->new('standby_off'); +$node_standby_off->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); +$node_standby_off->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = off +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); +$node_standby_off->start; + +$node_standby->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); +$node_standby_off->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); + +$node_standby->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); +]); +$node_standby_off->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot_off', 'test_decoding'); +]); + +my $slot_ready = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot' +]); +is($slot_ready, 'reserved', "slot created cleanly in Phase 1 (state: $slot_ready)"); + +my $off_slot_ready = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off' +]); +is($off_slot_ready, 'reserved', + "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); + +# --- Phase 2: catalog churn on primary --- # + +# Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. +$node_primary->safe_psql('postgres', qq[ + INSERT INTO events (payload) + SELECT 'row-' || g FROM generate_series(1, 3000) g; +]); +for (my $i = 0; $i < 20; $i++) { + $node_primary->safe_psql('postgres', + "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); +} +# Two ANALYZE calls make first-generation pg_statistic rows dead by +# overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. +$node_primary->safe_psql('postgres', qq[ + ANALYZE events; + ANALYZE events; + VACUUM pg_class; + VACUUM pg_attribute; + VACUUM pg_type; + VACUUM pg_depend; + VACUUM pg_statistic; +]); + +# Force and wait for the churn WAL to reach archive. +my $phase2_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); +$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); +$node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase2_seg' + FROM pg_stat_archiver +]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; + +# Orchestrator loop: watches the standby, drains+resumes on pause. +my $total_drained = 0; +my $pauses_seen = 0; +my $last_replay = ''; +my $stall_ticks = 0; +my $deadline = time() + 60; +while (time() < $deadline) { + my $state = $node_standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + my $replay = $node_standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + + if ($state eq 'paused' || $state eq 'pause requested') { + my $got = $node_standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('t_slot', NULL, NULL)"); + $total_drained += $got; + $pauses_seen++; + $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $stall_ticks = 0; + } elsif ($replay eq $last_replay) { + $stall_ticks++; + last if $stall_ticks > 10; + } else { + $stall_ticks = 0; + } + + $last_replay = $replay; + usleep(500_000); +} + +# Drain anything left +my $final = $node_standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('t_slot', NULL, NULL)"); +$total_drained += $final; + +my $slot_state = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 't_slot'; +]); +like($slot_state, qr/^reserved\|/, + "slot survived catalog prune with GUC on (state: $slot_state)"); + +cmp_ok($pauses_seen, '>=', 1, + "at least one pause event was handled ($pauses_seen seen)"); + +cmp_ok($total_drained, '>=', 2000, + "at least 2000 decoded events ($total_drained got)"); + +# Baseline assertion: the GUC-off standby, faced with the exact same +# Phase-2 archive, should invalidate its slot. This confirms the test +# setup actually triggers the conflict AND that GUC-off behavior is +# unchanged from upstream — if this ever starts passing with state +# "reserved", either the test stopped reproducing the trigger or the +# GUC-off path accidentally benefits from our patch. +my $off_state = 'reserved'; +for (my $i = 0; $i < 60; $i++) { + $off_state = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off'; + ]); + last if $off_state eq 'lost'; + usleep(500_000); +} + +is($off_state, 'lost', + "baseline (GUC off): slot invalidates as expected under catalog prune"); + +# Promote-during-pause: bring up a third standby, get it paused by the +# GUC, then call pg_promote() and assert promotion actually completes +# (rather than stalling until someone also runs pg_wal_replay_resume). +# Guards the CheckForStandbyTrigger() escape path in the wait loop. +my $node_standby_p = PostgreSQL::Test::Cluster->new('standby_promote'); +$node_standby_p->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); +$node_standby_p->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = on +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); +$node_standby_p->start; +$node_standby_p->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); +$node_standby_p->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('promote_slot', 'test_decoding')"); + +# Wait for replay to reach a pause (Phase-2 archive is already shipped +# so it will happen within a few seconds). +my $paused = 0; +for (my $i = 0; $i < 60; $i++) { + my $s = $node_standby_p->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + if ($s eq 'paused') { $paused = 1; last; } + usleep(500_000); +} +ok($paused, "promote-test standby reached paused state before promotion"); + +# Call pg_promote with a short wait. Without the CheckForStandbyTrigger +# escape in the wait loop, this stalls for the full wait_seconds and +# returns false; with the fix, it returns true in ~1 second. +my $t0 = time(); +my $promoted = $node_standby_p->safe_psql('postgres', + "SELECT pg_promote(wait => true, wait_seconds => 30)"); +my $elapsed = time() - $t0; +is($promoted, 't', "pg_promote returned true while standby was paused by GUC"); +cmp_ok($elapsed, '<', 10, + "pg_promote completed in under 10s (actual: ${elapsed}s)"); + +$node_standby_p->stop; +$node_standby_off->stop; +$node_standby->stop; +$node_primary->stop; + +done_testing(); From cd2b7beeadbf638c252fc797044acc6329b419d8 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 22 Apr 2026 17:49:55 +0000 Subject: [PATCH 3/5] Refactor 050_recovery_pause_on_slot_conflict.pl for readability Extract four named subs so the top-level script reads as a sequence of phases rather than one long procedure. No behavior change: all 10 assertions are preserved verbatim, as are the load-bearing comments (two-phase rationale, double pg_switch_wal rationale, GUC-off baseline rationale, pg_promote escape-path rationale). Helpers extracted: * setup_primary_with_clean_archive * create_archive_standby * run_catalog_churn * drain_and_resume_loop * wait_for_replay_paused --- .../t/050_recovery_pause_on_slot_conflict.pl | 365 ++++++++++-------- 1 file changed, 199 insertions(+), 166 deletions(-) diff --git a/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl index 56f6126203980..d1a03475e95b2 100644 --- a/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl +++ b/src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl @@ -28,13 +28,6 @@ # pg_get_wal_replay_pause_state() returns paused, drain the slot # via pg_logical_slot_get_changes, call pg_wal_replay_resume, # continue. -# -# Assertions: -# 1. The GUC is registered. -# 2. After the orchestrator finishes, the slot's wal_status is -# 'reserved' (NOT 'lost' / 'rows_removed'). -# 3. At least one pause event was handled. -# 4. At least 2000 decoded events came out of the slot. use strict; use warnings FATAL => 'all'; @@ -44,9 +37,18 @@ use Test::More; use Time::HiRes qw(usleep); -my $node_primary = PostgreSQL::Test::Cluster->new('primary'); -$node_primary->init(allows_streaming => 'logical', has_archiving => 1); -$node_primary->append_conf('postgresql.conf', qq[ +# --------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------- + +# Build the primary, seed the workload table, take a basebackup, and +# produce a "clean" archive: one that contains a standby snapshot but +# no catalog-prune WAL yet. Returns ($node_primary, $backup_name). +sub setup_primary_with_clean_archive +{ + my $node_primary = PostgreSQL::Test::Cluster->new('primary'); + $node_primary->init(allows_streaming => 'logical', has_archiving => 1); + $node_primary->append_conf('postgresql.conf', qq[ wal_level = logical archive_mode = on archive_timeout = 1s @@ -55,79 +57,187 @@ fsync = off synchronous_commit = off ]); -$node_primary->start; + $node_primary->start; -# 1. GUC visibility -my $guc = $node_primary->safe_psql('postgres', - "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); -is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + $node_primary->safe_psql('postgres', qq[ + CREATE TABLE events (id serial PRIMARY KEY, payload text); + ALTER TABLE events REPLICA IDENTITY FULL; + INSERT INTO events (payload) VALUES ('seed'); + ]); -# Set up a workload table on primary -$node_primary->safe_psql('postgres', qq[ - CREATE TABLE events (id serial PRIMARY KEY, payload text); - ALTER TABLE events REPLICA IDENTITY FULL; - INSERT INTO events (payload) VALUES ('seed'); -]); + my $backup_name = 'backup1'; + $node_primary->backup($backup_name); + + # Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) + # anchor for snapbuild. + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + + # Force the segment containing that anchor to archive so the standby + # can see it via restore_command. Switch TWICE: first switch closes + # the segment with the snapshot record; second switch gives + # snapbuild the forward WAL it needs to decide the slot is + # consistent. Without the second switch, + # DecodingContextFindStartpoint blocks on 'waiting for WAL to + # become available at seg N+1' — flaky slot creation. + my $phase1_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase1_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; + + return ($node_primary, $backup_name); +} -# --- Phase 1: clean archive, slot creation --- # - -my $backup_name = 'backup1'; -$node_primary->backup($backup_name); - -# Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) -# anchor for snapbuild. -$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); - -# Force the segment containing that anchor to archive so the standby -# can see it via restore_command. Switch TWICE: first switch closes the -# segment with the snapshot record; second switch gives snapbuild the -# forward WAL it needs to decide the slot is consistent. Without the -# second switch, DecodingContextFindStartpoint blocks on 'waiting for -# WAL to become available at seg N+1' — flaky slot creation. -my $phase1_seg = $node_primary->safe_psql('postgres', - "SELECT pg_walfile_name(pg_current_wal_lsn())"); -$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); -$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); -$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); -$node_primary->poll_query_until('postgres', qq[ - SELECT last_archived_wal IS NOT NULL - AND last_archived_wal >= '$phase1_seg' - FROM pg_stat_archiver -]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; - -# Bring up BOTH standbys (GUC-on and GUC-off) while the archive still -# contains only the quiet-moment snapshot — no prune records yet. Slot -# creation reaches SNAPBUILD_CONSISTENT quickly on both. Later, when -# Phase 2 ships the prune records, the two standbys diverge: the -# GUC-on one pauses and drains; the GUC-off one invalidates. -my $node_standby = PostgreSQL::Test::Cluster->new('standby'); -$node_standby->init_from_backup($node_primary, $backup_name, - has_streaming => 0, has_restoring => 1); -$node_standby->append_conf('postgresql.conf', qq[ +# Bring up an archive-only standby from $backup_name on $node_primary +# with recovery_pause_on_logical_slot_conflict set to $guc_value. Waits +# for replay to catch up, then returns the node. +sub create_archive_standby +{ + my ($node_primary, $backup_name, $name, $guc_value) = @_; + + my $standby = PostgreSQL::Test::Cluster->new($name); + $standby->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); + $standby->append_conf('postgresql.conf', qq[ hot_standby = on -recovery_pause_on_logical_slot_conflict = on +recovery_pause_on_logical_slot_conflict = $guc_value wal_level = logical max_standby_archive_delay = -1 max_standby_streaming_delay = -1 ]); -$node_standby->start; + $standby->start; + $standby->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); -my $node_standby_off = PostgreSQL::Test::Cluster->new('standby_off'); -$node_standby_off->init_from_backup($node_primary, $backup_name, - has_streaming => 0, has_restoring => 1); -$node_standby_off->append_conf('postgresql.conf', qq[ -hot_standby = on -recovery_pause_on_logical_slot_conflict = off -wal_level = logical -max_standby_archive_delay = -1 -max_standby_streaming_delay = -1 -]); -$node_standby_off->start; + return $standby; +} + +# Churn the primary's catalog enough to emit catalog-prune WAL records, +# then force and wait for those records to reach the archive. +sub run_catalog_churn +{ + my ($node_primary) = @_; + + # Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. + $node_primary->safe_psql('postgres', qq[ + INSERT INTO events (payload) + SELECT 'row-' || g FROM generate_series(1, 3000) g; + ]); + for (my $i = 0; $i < 20; $i++) { + $node_primary->safe_psql('postgres', + "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); + } + # Two ANALYZE calls make first-generation pg_statistic rows dead by + # overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. + $node_primary->safe_psql('postgres', qq[ + ANALYZE events; + ANALYZE events; + VACUUM pg_class; + VACUUM pg_attribute; + VACUUM pg_type; + VACUUM pg_depend; + VACUUM pg_statistic; + ]); + + my $phase2_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase2_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; + + return; +} + +# Orchestrator loop for the GUC-on standby: when replay pauses, drain +# the slot via pg_logical_slot_get_changes and call +# pg_wal_replay_resume(). Exits when replay stops advancing or when +# $deadline_seconds have passed. Returns ($pauses_seen, $total_drained) +# and includes a final drain of anything left on the slot. +sub drain_and_resume_loop +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $pauses_seen = 0; + my $last_replay = ''; + my $stall_ticks = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + my $state = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + my $replay = $standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + + if ($state eq 'paused' || $state eq 'pause requested') { + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + $pauses_seen++; + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $stall_ticks = 0; + } elsif ($replay eq $last_replay) { + $stall_ticks++; + last if $stall_ticks > 10; + } else { + $stall_ticks = 0; + } + + $last_replay = $replay; + usleep(500_000); + } + + my $final = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $final; + + return ($pauses_seen, $total_drained); +} -$node_standby->poll_query_until('postgres', - "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); -$node_standby_off->poll_query_until('postgres', - "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); +# Poll until $standby reports replay as paused, up to ~30 seconds. +# Returns 1 on success, 0 on timeout. +sub wait_for_replay_paused +{ + my ($standby) = @_; + + for (my $i = 0; $i < 60; $i++) { + my $s = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + return 1 if $s eq 'paused'; + usleep(500_000); + } + return 0; +} + +# --------------------------------------------------------------------- +# Main script +# --------------------------------------------------------------------- + +# 1. GUC visibility. +my ($node_primary, $backup_name) = setup_primary_with_clean_archive(); + +my $guc = $node_primary->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); +is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + +# 2. Phase 1: bring up BOTH standbys (GUC-on and GUC-off) while the +# archive still contains only the quiet-moment snapshot — no prune +# records yet. Slot creation reaches SNAPBUILD_CONSISTENT quickly on +# both. Later, when Phase 2 ships the prune records, the two standbys +# diverge: the GUC-on one pauses and drains; the GUC-off one +# invalidates. +my $node_standby = create_archive_standby($node_primary, $backup_name, + 'standby', 'on'); +my $node_standby_off = create_archive_standby($node_primary, $backup_name, + 'standby_off', 'off'); $node_standby->safe_psql('postgres', qq[ SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); @@ -147,73 +257,12 @@ is($off_slot_ready, 'reserved', "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); -# --- Phase 2: catalog churn on primary --- # - -# Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. -$node_primary->safe_psql('postgres', qq[ - INSERT INTO events (payload) - SELECT 'row-' || g FROM generate_series(1, 3000) g; -]); -for (my $i = 0; $i < 20; $i++) { - $node_primary->safe_psql('postgres', - "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); -} -# Two ANALYZE calls make first-generation pg_statistic rows dead by -# overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. -$node_primary->safe_psql('postgres', qq[ - ANALYZE events; - ANALYZE events; - VACUUM pg_class; - VACUUM pg_attribute; - VACUUM pg_type; - VACUUM pg_depend; - VACUUM pg_statistic; -]); - -# Force and wait for the churn WAL to reach archive. -my $phase2_seg = $node_primary->safe_psql('postgres', - "SELECT pg_walfile_name(pg_current_wal_lsn())"); -$node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); -$node_primary->poll_query_until('postgres', qq[ - SELECT last_archived_wal IS NOT NULL - AND last_archived_wal >= '$phase2_seg' - FROM pg_stat_archiver -]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; - -# Orchestrator loop: watches the standby, drains+resumes on pause. -my $total_drained = 0; -my $pauses_seen = 0; -my $last_replay = ''; -my $stall_ticks = 0; -my $deadline = time() + 60; -while (time() < $deadline) { - my $state = $node_standby->safe_psql('postgres', - "SELECT pg_get_wal_replay_pause_state()"); - my $replay = $node_standby->safe_psql('postgres', - "SELECT pg_last_wal_replay_lsn()"); - - if ($state eq 'paused' || $state eq 'pause requested') { - my $got = $node_standby->safe_psql('postgres', - "SELECT COUNT(*) FROM pg_logical_slot_get_changes('t_slot', NULL, NULL)"); - $total_drained += $got; - $pauses_seen++; - $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); - $stall_ticks = 0; - } elsif ($replay eq $last_replay) { - $stall_ticks++; - last if $stall_ticks > 10; - } else { - $stall_ticks = 0; - } +# 3. Phase 2: catalog churn on primary, then wait for archive. +run_catalog_churn($node_primary); - $last_replay = $replay; - usleep(500_000); -} - -# Drain anything left -my $final = $node_standby->safe_psql('postgres', - "SELECT COUNT(*) FROM pg_logical_slot_get_changes('t_slot', NULL, NULL)"); -$total_drained += $final; +# 4. Orchestrator loop on the GUC-on standby. +my ($pauses_seen, $total_drained) = + drain_and_resume_loop($node_standby, 't_slot', 60); my $slot_state = $node_standby->safe_psql('postgres', qq[ SELECT wal_status || '|' || COALESCE(invalidation_reason, '') @@ -228,7 +277,7 @@ cmp_ok($total_drained, '>=', 2000, "at least 2000 decoded events ($total_drained got)"); -# Baseline assertion: the GUC-off standby, faced with the exact same +# 5. Baseline assertion: the GUC-off standby, faced with the exact same # Phase-2 archive, should invalidate its slot. This confirms the test # setup actually triggers the conflict AND that GUC-off behavior is # unchanged from upstream — if this ever starts passing with state @@ -246,35 +295,19 @@ is($off_state, 'lost', "baseline (GUC off): slot invalidates as expected under catalog prune"); -# Promote-during-pause: bring up a third standby, get it paused by the -# GUC, then call pg_promote() and assert promotion actually completes -# (rather than stalling until someone also runs pg_wal_replay_resume). -# Guards the CheckForStandbyTrigger() escape path in the wait loop. -my $node_standby_p = PostgreSQL::Test::Cluster->new('standby_promote'); -$node_standby_p->init_from_backup($node_primary, $backup_name, - has_streaming => 0, has_restoring => 1); -$node_standby_p->append_conf('postgresql.conf', qq[ -hot_standby = on -recovery_pause_on_logical_slot_conflict = on -wal_level = logical -max_standby_archive_delay = -1 -max_standby_streaming_delay = -1 -]); -$node_standby_p->start; -$node_standby_p->poll_query_until('postgres', - "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); +# 6. Promote-during-pause: bring up a third standby, get it paused by +# the GUC, then call pg_promote() and assert promotion actually +# completes (rather than stalling until someone also runs +# pg_wal_replay_resume). Guards the CheckForStandbyTrigger() escape +# path in the wait loop. +my $node_standby_p = create_archive_standby($node_primary, $backup_name, + 'standby_promote', 'on'); $node_standby_p->safe_psql('postgres', "SELECT pg_create_logical_replication_slot('promote_slot', 'test_decoding')"); -# Wait for replay to reach a pause (Phase-2 archive is already shipped -# so it will happen within a few seconds). -my $paused = 0; -for (my $i = 0; $i < 60; $i++) { - my $s = $node_standby_p->safe_psql('postgres', - "SELECT pg_get_wal_replay_pause_state()"); - if ($s eq 'paused') { $paused = 1; last; } - usleep(500_000); -} +# Phase-2 archive is already shipped so a pause will happen within a +# few seconds. +my $paused = wait_for_replay_paused($node_standby_p); ok($paused, "promote-test standby reached paused state before promotion"); # Call pg_promote with a short wait. Without the CheckForStandbyTrigger From 39adedd1b39bf31b6ecbd1ac2ee6caa1e68e1c7a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 22 Apr 2026 18:05:46 +0000 Subject: [PATCH 4/5] Auto-resume recovery once the logical slot conflict is resolved MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous behavior under recovery_pause_on_logical_slot_conflict required the operator to both drain (or drop / advance) the slot AND call pg_wal_replay_resume() to continue — two steps, even though the first step is the one that matters semantically. That split also meant the feature couldn't underpin a continuous-CDC service without external orchestration to issue the resume. Lift the scan predicate ("does any slot in `dboid` still block this conflict?") out of the initial check into a helper AnySlotStillBlocksConflict(). Call it again every 1s inside the existing wait loop. When it returns false, flip the pause state to NOT_PAUSED and let the loop exit; the existing post-wait advance then bumps catalog_xmin past the horizon on drained slots so the fall-through InvalidateObsoleteReplicationSlots() is a no-op. "No longer blocking" covers every unblock path, not just drain: * drained past the pause LSN (confirmed_flush >= captured conflict_lsn) — the main case * slot dropped (pg_drop_replication_slot) — removed from the scan * slot advanced (pg_replication_slot_advance) — catalog_xmin moves past the horizon * slot invalidated for another reason (e.g. RS_INVAL_WAL_REMOVED from max_slot_wal_keep_size, applied by the checkpointer, which runs even while the startup process is asleep in our wait loop) — data.invalidated != RS_INVAL_NONE, scan skips it Manual pg_wal_replay_resume() still works as the "give up on this slot and let it invalidate" escape hatch, and CheckForStandbyTrigger still breaks the loop for pg_promote(). Capture conflict_lsn once at pause time and reuse it for both the in-wait predicate and the post-wait advance, replacing the redundant second GetXLogReplayRecPtr() call. GUC long_desc, postgresql.conf.sample comment, and the xlogrecovery.c variable-decl comment updated to describe auto-resume. --- src/backend/access/transam/xlogrecovery.c | 5 +- src/backend/storage/ipc/standby.c | 181 +++++++++++------- src/backend/utils/misc/guc_parameters.dat | 2 +- src/backend/utils/misc/postgresql.conf.sample | 4 +- 4 files changed, 115 insertions(+), 77 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0452e0d33d52d..3ad968a86790e 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -100,7 +100,10 @@ int recovery_min_apply_delay = 0; * If true, when WAL replay on a standby is about to invalidate an otherwise- * active logical replication slot because a catalog PRUNE_ON_ACCESS record's * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay - * instead and give an operator a chance to drain (or drop) the slot. + * instead. Replay auto-resumes once the consumer has drained the slot past + * the pause point (or the slot is dropped, advanced, or otherwise no longer + * blocking); pg_wal_replay_resume() also forces continuation. See + * MaybePauseOnLogicalSlotConflict() in standby.c. * * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: * an archive-only logical-decoding standby cannot feed hot_standby_feedback diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 62f743cc7eeb1..521d24fb74d42 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -514,41 +514,37 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, } /* - * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about - * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon - * would cause the invalidation of at least one non-invalidated logical slot - * in the same database, request a recovery pause and wait on the recovery - * pause condition variable until an operator resumes. + * Returns true if at least one non-synced logical slot in `dboid` still + * blocks replay past snapshotConflictHorizon. * - * On resume the caller re-falls through to InvalidateObsoleteReplicationSlots: - * if the operator has drained / dropped / advanced the slot, invalidation is - * a no-op; if they chose to resume without acting, the slot is invalidated - * as usual. This matches the recovery_target_action=pause precedent. + * "Blocks" means: the slot is in use, not invalidated, snapbuild-consistent + * (effective_catalog_xmin is valid — skipping in-progress slots avoids a + * deadlock with DecodingContextFindStartpoint), and its catalog_xmin + * precedes-or-equals the horizon. * - * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer - * locks are taken, so pausing here does not deadlock with anything. + * Use PrecedesOrEquals (not Precedes) to match DetermineSlotInvalidationCause. + * Otherwise a slot whose catalog_xmin was just advanced to exactly horizon by + * a previous pause-and-advance cycle fails to re-pause on the next prune + * record with the same horizon, yet would still be invalidated by the + * fall-through InvalidateObsoleteReplicationSlots call. + * + * Synced slots are skipped: writing their fields from the startup process + * would race the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT errors + * out on a synced slot so the operator-facing recipe does not apply. + * + * When conflict_lsn is valid (in-wait auto-resume check), slots whose + * confirmed_flush_lsn has reached conflict_lsn are treated as not blocking: + * the consumer has caught up to the pause point and the post-wait advance + * code will bump their catalog_xmin past the horizon. Pass InvalidXLogRecPtr + * for the initial pause-or-not decision (we don't yet have a pause point). */ -void -MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +static bool +AnySlotStillBlocksConflict(Oid dboid, TransactionId snapshotConflictHorizon, + XLogRecPtr conflict_lsn) { int i; - bool would_invalidate = false; - - if (!recovery_pause_on_logical_slot_conflict) - return; - if (!TransactionIdIsValid(snapshotConflictHorizon)) - return; + bool blocking = false; - /* - * Scan for a would-be-invalidated slot in the conflicting database. - * - * Skip slots that have not yet reached snapshot-builder consistency - * (effective_catalog_xmin is still InvalidTransactionId). An in-progress - * slot has not produced any output to a consumer, so invalidating it is - * harmless — the caller can retry. Pausing for such a slot would - * deadlock: DecodingContextFindStartpoint would be waiting for replay - * to advance, while replay would be waiting for the slot to be drained. - */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -556,7 +552,8 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon Oid slot_db; TransactionId slot_xmin; TransactionId slot_effective_xmin; - bool active_logical; + XLogRecPtr slot_confirmed; + bool is_candidate; if (!s->in_use) continue; @@ -565,52 +562,80 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon slot_db = s->data.database; slot_xmin = s->data.catalog_xmin; slot_effective_xmin = s->effective_catalog_xmin; - /* - * Skip synced slots (managed by the slot-sync worker per - * sync_replication_slots). Writing their fields from the startup - * process would race with the slot-sync worker's own updates, and - * the operator-facing "drain or drop the slot" recipe in the - * errhint below cannot be applied to a synced slot (ALTER / - * DROP_REPLICATION_SLOT error on synced). - */ - active_logical = (s->data.invalidated == RS_INVAL_NONE && - slot_db != InvalidOid && - TransactionIdIsValid(slot_effective_xmin) && - !s->data.synced); + slot_confirmed = s->data.confirmed_flush; + is_candidate = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); SpinLockRelease(&s->mutex); - if (!active_logical) + if (!is_candidate) continue; if (slot_db != dboid) continue; if (!TransactionIdIsValid(slot_xmin)) continue; - /* - * Use PrecedesOrEquals (not Precedes) to match the check in - * DetermineSlotInvalidationCause. Otherwise a slot whose - * catalog_xmin was just advanced to exactly conflict_horizon by - * a previous pause-and-advance cycle (our own resume code) will - * NOT trigger a pause here when the next prune record arrives - * with horizon == catalog_xmin, yet WILL still be invalidated - * by the fall-through InvalidateObsoleteReplicationSlots call. - */ - if (TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) - { - would_invalidate = true; - break; - } + if (!TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + continue; + if (conflict_lsn != InvalidXLogRecPtr && + slot_confirmed >= conflict_lsn) + continue; + + blocking = true; + break; } LWLockRelease(ReplicationSlotControlLock); - if (!would_invalidate) + return blocking; +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait until the conflict + * is resolved. + * + * The wait exits in any of: + * - Auto-resume: a periodic re-scan finds no slot still blocking. Any of + * draining past the pause LSN, dropping the slot, pg_replication_slot_ + * advance(), or out-of-band invalidation (e.g. max_slot_wal_keep_size + * applied by the checkpointer, which runs even while startup is paused + * here) will satisfy this. The post-wait advance then bumps catalog_xmin + * on drained slots so the fall-through InvalidateObsoleteReplicationSlots() + * is a no-op. + * - Manual resume: pg_wal_replay_resume() flips the state to NOT_PAUSED. + * Any slot still blocking at that point is invalidated by the + * fall-through — the "give up on this slot" escape hatch. + * - Promote: CheckForStandbyTrigger() consumes PROMOTE_SIGNAL_FILE and we + * return early so the startup process can finish promotion. + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + XLogRecPtr conflict_lsn; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) return; + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + InvalidXLogRecPtr)) + return; + + conflict_lsn = GetXLogReplayRecPtr(NULL); + ereport(LOG, (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))), + LSN_FORMAT_ARGS(conflict_lsn)), errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", snapshotConflictHorizon, dboid), - errhint("Drain, advance, or drop the slot, then execute pg_wal_replay_resume()."))); + errhint("Recovery will resume automatically once the slot is drained past %X/%X, dropped, advanced, or invalidated for another reason; pg_wal_replay_resume() forces continuation (invalidating any remaining blocking slot).", + LSN_FORMAT_ARGS(conflict_lsn)))); SetRecoveryPause(true); @@ -632,6 +657,21 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon return; } + /* + * Auto-resume: if nothing is still blocking this conflict, clear + * the pause and let the loop condition exit. The post-wait advance + * will bump catalog_xmin on any slot that drained past conflict_lsn + * so the fall-through InvalidateObsoleteReplicationSlots() is a + * no-op. Slots invalidated out of band (dropped, WAL-removed, + * etc.) are simply not in the scan anymore. + */ + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + conflict_lsn)) + { + SetRecoveryPause(false); + break; + } + /* * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that * observers (pg_get_wal_replay_pause_state() / monitoring) see the @@ -644,21 +684,14 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon ConditionVariableCancelSleep(); /* - * Operator has resumed. If they drained slot(s) up to (or past) the LSN - * of the about-to-be-replayed conflict record, we trust that the consumer - * downstream has captured everything that needed the pre-conflict catalog - * snapshot. Advance those slots' catalog_xmin past the horizon so the - * subsequent InvalidateObsoleteReplicationSlots() fall-through is a - * no-op. Slots that the operator did NOT drain are left alone and get - * invalidated normally — that is the "I didn't act, just let the slot - * die" path. - * - * "Drained past the conflict LSN" is defined as: the slot's - * confirmed_flush_lsn >= the LSN at which replay has paused, which is - * the current replay position reported by GetXLogReplayRecPtr. + * Wait is over. For any slot whose consumer drained up to (or past) + * conflict_lsn, advance catalog_xmin past the horizon so the subsequent + * InvalidateObsoleteReplicationSlots() fall-through is a no-op. Slots + * that did not drain are left alone and get invalidated normally — the + * "I didn't act, just let the slot die" path that runs when an operator + * manually resumed without draining. */ { - XLogRecPtr conflict_lsn = GetXLogReplayRecPtr(NULL); int j; LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); @@ -713,7 +746,7 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon ereport(LOG, (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", NameStr(s->data.name), snapshotConflictHorizon), - errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; operator drained before resuming.", + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; consumer drained past the pause point.", LSN_FORMAT_ARGS(s->data.confirmed_flush), LSN_FORMAT_ARGS(conflict_lsn)))); } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 165e69ca90f31..ac7c355c22c1f 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2443,7 +2443,7 @@ { name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', - long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. The operator can then drain or drop the slot and call pg_wal_replay_resume() to continue.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. Recovery resumes automatically once the slot has been drained past the pause point, dropped, advanced, or invalidated for another reason (e.g. max_slot_wal_keep_size). pg_wal_replay_resume() also forces continuation, invalidating any remaining blocking slot.', variable => 'recovery_pause_on_logical_slot_conflict', boot_val => 'false', }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 17b2bcc4df8b9..414fed447cfe2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -404,7 +404,9 @@ # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery #recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating - # a logical slot on catalog conflict + # a logical slot on catalog conflict; + # auto-resumes once the slot is drained, + # dropped, or otherwise unblocks #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - From b2b7464fe9a694c8e5af229f1c937737d9469d28 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 22 Apr 2026 18:19:33 +0000 Subject: [PATCH 5/5] Add draft cover letter for pgsql-hackers submission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Combines PR 27 (pause-on-conflict + TAP test) and PR 30 (refactor + auto-resume) into the story we would send to -hackers. Covers motivation, mechanism, edge cases, known limitations, tests, files touched, and open questions (GUC name, single-vs-mode flag, persistence, scope). Draft only — not sent. --- blueprints/hackers-cover-letter.md | 218 +++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 blueprints/hackers-cover-letter.md diff --git a/blueprints/hackers-cover-letter.md b/blueprints/hackers-cover-letter.md new file mode 100644 index 0000000000000..09beaed099ac3 --- /dev/null +++ b/blueprints/hackers-cover-letter.md @@ -0,0 +1,218 @@ +# [PATCH] recovery_pause_on_logical_slot_conflict: enable continuous logical decoding from an archive-only standby + +Draft cover letter for pgsql-hackers. Not yet sent. + +--- + +Hackers, + +This patch adds a new GUC, `recovery_pause_on_logical_slot_conflict` +(`PGC_SIGHUP`, default `off`), that makes WAL replay on a standby pause — +and later auto-resume — instead of invalidating an otherwise-healthy +logical replication slot when a catalog `PRUNE_ON_ACCESS` record's +`snapshotConflictHorizon` has overtaken the slot's `catalog_xmin`. + +## Motivation + +A logical replication slot created on a standby that receives WAL only +via `restore_command` — no streaming link to the primary — cannot feed +`hot_standby_feedback` upstream, so it has no natural way to keep the +primary's catalog horizon pinned. Without this GUC, such a slot is +invalidated the first time replay applies a catalog vacuum record whose +horizon exceeds the slot's `catalog_xmin`, typically within +~`2 * autovacuum_naptime` of slot creation. + +That makes continuous logical decoding from an archive-only standby +(a useful building block for CDC off a compliance / read-replica / cold +tier) effectively impossible today. With this GUC on, the same workload +runs as a service: + +1. Replay encounters a conflicting prune record. +2. The startup process requests a recovery pause and waits. +3. The downstream consumer drains the slot past the pause LSN. +4. A periodic re-scan notices the slot no longer blocks, clears the + pause, and advances `catalog_xmin` past the horizon so the subsequent + `InvalidateObsoleteReplicationSlots()` call is a no-op. +5. Replay continues to the next conflict; the cycle repeats. + +No operator action is required between drain and resume. A +drain-aware consumer turns this into a true continuous pipeline; +`pg_wal_replay_resume()` remains available as the "give up on this slot" +escape hatch. + +## What the patch does + +- Adds the GUC (`bool`, `PGC_SIGHUP`, default `off`, + group `REPLICATION_STANDBY`). One boolean early-return on the hot path + when `off`; no new shared-memory state. +- Hooks a single choke point: `ResolveRecoveryConflictWithSnapshot()` + calls `MaybePauseOnLogicalSlotConflict()` before + `InvalidateObsoleteReplicationSlots()` for the + `RS_INVAL_HORIZON` / `isCatalogRel` case. +- Reuses the existing `recoveryNotPausedCV` / + `SetRecoveryPause` / `ConfirmRecoveryPaused` machinery. + `RECOVERY_PAUSE_REQUESTED → RECOVERY_PAUSED` is driven from inside + `MaybePauseOnLogicalSlotConflict` so + `pg_get_wal_replay_pause_state()` reflects reality. +- Wait loop: + - `ProcessStartupProcInterrupts()` + - `CheckForStandbyTrigger()` — escape so `pg_promote()` doesn't stall + - `AnySlotStillBlocksConflict()` — auto-resume predicate + - `ConfirmRecoveryPaused()` + - `ConditionVariableTimedSleep(&recoveryNotPausedCV, 1s, ...)` +- Auto-resume predicate treats a slot as no longer blocking when any + of the following holds: + - `data.invalidated != RS_INVAL_NONE` (dropped, WAL-removed, etc.) + - `data.synced` (managed by the slot-sync worker — upstream's + concern, not ours) + - `catalog_xmin` has advanced past the horizon + (`pg_replication_slot_advance()`) + - `confirmed_flush_lsn` has reached the pause-point LSN (drained) +- On wait exit, advance `catalog_xmin` (and `xmin`) past the horizon on + drained slots so the fall-through invalidation is a no-op. Slots that + weren't drained are left alone and get invalidated normally — that is + the "give up" path when an operator uses `pg_wal_replay_resume()` + manually. + +## Edge cases we thought about + +- **In-progress slots.** Slots whose `effective_catalog_xmin` is still + `InvalidTransactionId` (still inside `DecodingContextFindStartpoint`) + are skipped in both the pause check and the advance. Pausing for one + would deadlock: `DecodingContextFindStartpoint` needs replay to move + forward to reach `SNAPBUILD_CONSISTENT`. Invalidating an in-progress + slot is harmless — the caller retries. + +- **Synced slots.** Slots with `data.synced = true` are skipped. Writing + their fields from the startup process would race with the slot-sync + worker, and `ALTER` / `DROP_REPLICATION_SLOT` error out on a synced + slot so the operator-facing recipe doesn't apply. + +- **`PrecedesOrEquals` vs `Precedes`.** We use + `TransactionIdPrecedesOrEquals` to match + `DetermineSlotInvalidationCause`. With strict `Precedes`, a slot whose + `catalog_xmin` was just advanced to exactly `horizon` by a previous + pause-and-advance cycle would fail to re-pause on the next prune + record with `horizon == catalog_xmin`, yet would still be invalidated + by the fall-through. + +- **`pg_promote()` during pause.** `CheckForStandbyTrigger()` — which + actually consumes `PROMOTE_SIGNAL_FILE`, not just reads the flag — is + called in the wait loop. Without that, `pg_promote(wait => true)` + stalls for the full `wait_seconds` and returns false. + +- **`max_slot_wal_keep_size` while paused.** The checkpointer is a + separate process and runs restartpoints even while the startup process + is asleep in the wait loop, so `RS_INVAL_WAL_REMOVED` can be applied + out of band. The auto-resume predicate picks that up on the next tick + and lets replay continue. The same mechanism handles an operator + dropping or advancing the slot. + +## Known limitations + +- **Persistence of the post-wait advance.** We mark slots dirty but do + not force `SaveSlotToPath`. If the standby crashes between resume and + the next restartpoint, the on-disk `catalog_xmin` is the pre-advance + value and replay re-encounters the same conflict on restart, re-pauses, + and the consumer re-drains. The drain is idempotent so there is no + data loss, but it is an operator-visible hiccup. `SaveSlotToPath` is + currently static in `slot.c` and not trivially callable from the + startup process (no `MyReplicationSlot`); we'd appreciate feedback on + whether to expose it or accept the current behavior. + +- **No backstop timeout.** If no consumer ever drains, the standby sits + paused indefinitely. We considered a timeout GUC but chose not to wire + one in — it is orthogonal and can be added later. Promote is already + an escape. + +- **Scope: `RS_INVAL_HORIZON` only.** Non-horizon invalidation causes + (WAL-removed, idle-timeout) reflect explicit operator policy and + should continue to invalidate; the patch does not touch them. + +- **Opt-in, default off.** Upstream behavior is unchanged for every + existing deployment; only operators who want continuous decoding from + a WAL-shipping standby flip the GUC on. + +## Tests + +New TAP test `src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl` +(~30 s wall-clock, 10 assertions). Three archive-only standbys from the +same basebackup: + +1. **GUC on**: pauses on conflict; orchestrator drains; slot stays + `reserved`; ≥2000 decoded events across ≥1 pause cycle. +2. **GUC off (baseline)**: under identical WAL, slot goes to `lost` — + proves the conflict actually fires *and* that upstream behavior is + unchanged. +3. **GUC on + `pg_promote(wait=>true, wait_seconds=>30)`**: asserts + promote returns `t` in under 10 s. Guards the + `CheckForStandbyTrigger()` escape in the wait loop. + +The Phase-1 / Phase-2 split in the test is deliberate: slot creation +must reach `SNAPBUILD_CONSISTENT` before any conflicting prune record is +replayed, or `DecodingContextFindStartpoint` and our pause code +deadlock. The test takes basebackup, runs `pg_log_standby_snapshot()` + +`pg_switch_wal()` twice, waits for the anchor segment to archive, +creates slots, *then* runs the catalog-churning workload. The rationale +is commented inline. + +## Files + + src/backend/access/transam/xlogrecovery.c ~25 +/- + src/backend/storage/ipc/standby.c ~320 +/- + src/backend/utils/misc/guc_parameters.dat 9 ++ + src/backend/utils/misc/postgresql.conf.sample 4 ++ + src/include/access/xlogrecovery.h 3 ++ + src/include/storage/standby.h 2 ++ + src/test/recovery/t/050_recovery_pause_on_slot_conflict.pl 296 +++ + +Two functions are promoted from static to extern so the new code in +`standby.c` can drive them: `ConfirmRecoveryPaused()` and +`CheckForStandbyTrigger()`. Both are called only from the wait loop and +are direct parallels to how `recoveryPausesHere()` uses them today. + +## Open questions + +1. **GUC name.** `recovery_pause_on_logical_slot_conflict` is long but + descriptive. An alternative: `logical_slot_conflict_action` with + enum values (`invalidate` | `pause`). Happy to rename. + +2. **Single GUC, auto-resume always.** We could have added a second knob + for "pause only, never auto-resume", but we couldn't find a use case + where a standby that pauses on conflict actually wants to keep + sitting paused after the consumer has drained. Manual + `pg_wal_replay_resume()` still works as the "give up on this slot" + escape. If people want the explicit two-step behavior back, we can + add a mode flag. + +3. **Persistence.** Force `SaveSlotToPath` on advance, or accept the + crash-redo behavior? + +4. **Broader invalidation causes.** The patch scopes to + `RS_INVAL_HORIZON`. Other causes (WAL-removed, idle-timeout) reflect + operator policy and are probably right to keep invalidating — but + someone might have a use case we haven't seen. + +Feedback and review appreciated. + +Thanks, + +— + +## Appendix: commit layout + +The series on the working branch is: + +- `1ef78be` Pause recovery on logical slot conflict + (core feature + GUC + wait loop + manual-resume behavior) +- `ffd897c` Add TAP test for recovery_pause_on_logical_slot_conflict +- `cd2b7be` Refactor `050_recovery_pause_on_slot_conflict.pl` for + readability (extracts helpers; no behavior change) +- `39adedd` Auto-resume recovery once the logical slot conflict is + resolved (extracts `AnySlotStillBlocksConflict` helper, re-scans in + the wait loop, flips pause state when nothing blocks; keeps manual + `pg_wal_replay_resume()` as the escape) + +For a single-patch submission we'd likely squash to two commits +(feature+test, documentation) or three (feature, test, docs); the +refactor commit exists to keep diff review tractable and would fold in.