From 19515be1ec3818e4c2ae687dca00240e79b6747d Mon Sep 17 00:00:00 2001 From: Nik Samokhvalov Date: Wed, 27 May 2026 10:56:29 -0700 Subject: [PATCH 1/3] xlogrecovery: make ConfirmRecoveryPaused and CheckForStandbyTrigger extern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MaybePauseOnLogicalSlotConflict (introduced in the next commit) runs inside ResolveRecoveryConflictWithSnapshot, which is called from the WAL apply path rather than the main recovery loop. Its wait loop must do the same two things recoveryPausesHere() does: 1. Transition RECOVERY_PAUSE_REQUESTED -> RECOVERY_PAUSED so that pg_wal_replay_resume() can release the pause. 2. Check for a promote signal so that pg_promote() does not stall while the startup process is sleeping inside the slot-conflict wait. Both are currently static. Remove the static qualifier and add extern declarations to xlogrecovery.h so standby.c can call them. No behaviour change — only visibility changes. Co-Authored-By: Claude Sonnet 4.6 --- src/backend/access/transam/xlogrecovery.c | 10 ++++++---- src/include/access/xlogrecovery.h | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 73b78a83fa744..80282e4689eae 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -363,7 +363,8 @@ static bool recoveryStopsAfter(XLogReaderState *record); static char *getRecoveryStopReason(void); static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); -static void ConfirmRecoveryPaused(void); +/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */ +void ConfirmRecoveryPaused(void); static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, @@ -386,7 +387,8 @@ static int XLogFileRead(XLogSegNo segno, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source); -static bool CheckForStandbyTrigger(void); +/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */ +bool CheckForStandbyTrigger(void); static void SetPromoteIsTriggered(void); static bool HotStandbyActiveInReplay(void); @@ -3083,7 +3085,7 @@ SetRecoveryPause(bool recoveryPause) * Confirm the recovery pause by setting the recovery pause state to * RECOVERY_PAUSED. */ -static void +void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ @@ -4438,7 +4440,7 @@ SetPromoteIsTriggered(void) /* * Check whether a promote request has arrived. */ -static bool +bool CheckForStandbyTrigger(void) { if (LocalPromoteIsTriggered) diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index ba7750dca0b45..1683ec14a5a7a 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -213,6 +213,8 @@ extern bool HotStandbyActive(void); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern RecoveryPauseState GetRecoveryPauseState(void); extern void SetRecoveryPause(bool recoveryPause); +extern void ConfirmRecoveryPaused(void); +extern bool CheckForStandbyTrigger(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); From d26b124be205e522a255dbacd330f961d957110c Mon Sep 17 00:00:00 2001 From: Nik Samokhvalov Date: Wed, 27 May 2026 10:58:07 -0700 Subject: [PATCH 2/3] Add recovery_pause_on_logical_slot_conflict GUC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new GUC, recovery_pause_on_logical_slot_conflict (PGC_SIGHUP, default off). When enabled, WAL replay on a standby pauses instead of invalidating an active logical replication slot whose catalog_xmin would be overtaken by a Heap2/PRUNE_ON_ACCESS record's snapshotConflictHorizon. An operator can then drain the slot via pg_logical_slot_get_changes and call pg_wal_replay_resume() to continue. On resume, the slot's catalog_xmin is advanced past the conflict horizon so the subsequent InvalidateObsoleteReplicationSlots call becomes a no-op; replay continues to the next conflict and the cycle repeats. This makes logical decoding from an archive-only standby (no streaming replication link to the primary) viable for continuous CDC. Without this GUC, slots on such standbys are invalidated the first time replay applies a catalog vacuum record whose horizon exceeds the slot's catalog_xmin — typically ~2 * autovacuum_naptime after slot creation. Hooks into ResolveRecoveryConflictWithSnapshot(), the single choke point in the replay path for RS_INVAL_HORIZON conflicts, via a new MaybePauseOnLogicalSlotConflict() function in standby.c. Reuses the existing SetRecoveryPause / recoveryNotPausedCV machinery — no new shared-memory state. Hot path when GUC off is one boolean early-return. Edge cases handled: - Slots still inside DecodingContextFindStartpoint (effective_catalog_xmin not yet valid) are skipped. Pausing for them would deadlock: snapbuild needs WAL to advance, pause holds it back. Invalidating an in-progress slot is harmless — the caller retries. - Pause-check uses TransactionIdPrecedesOrEquals to match the semantics of DetermineSlotInvalidationCause. Without that, a slot whose catalog_xmin was just advanced to horizon+1 by a previous pause cycle would fail to re-pause on a subsequent record with horizon == catalog_xmin, yet would still be invalidated. - CheckForStandbyTrigger() is called in the wait loop so pg_promote() does not stall while paused. Mirrors the existing recoveryPausesHere escape loop. - Synced slots (data.synced == true) are skipped in both the pause-check and advance scans. Writing to their fields from the startup process would race with the slot-sync worker. Crash safety: after advancing catalog_xmin in memory, dirty slots are flushed to disk immediately via CheckPointReplicationSlots(false) before returning. This upholds the write-before-memory-update invariant established by LogicalConfirmReceivedLocation (logical.c): the on-disk state must reflect any advance before the in-memory value becomes visible, so that vacuum cannot reclaim catalog tuples the slot still needs. Deferring to the next restartpoint would leave a crash window. Includes a TAP test (050_recovery_pause_on_slot_conflict.pl) covering: - GUC registration - Slot survival through catalog PRUNE_ON_ACCESS records (GUC on) - Baseline slot invalidation (GUC off, unchanged upstream behaviour) - pg_promote() succeeds in under 10 s while the standby is paused (guards the CheckForStandbyTrigger() escape path) Co-Authored-By: Claude Sonnet 4.6 --- src/backend/access/transam/xlogrecovery.c | 19 + src/backend/storage/ipc/standby.c | 235 +++++++++++++ src/backend/utils/misc/guc_parameters.dat | 8 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/xlogrecovery.h | 1 + src/include/storage/standby.h | 2 + src/test/recovery/meson.build | 1 + .../t/054_recovery_pause_on_slot_conflict.pl | 329 ++++++++++++++++++ 8 files changed, 597 insertions(+) create mode 100644 src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 80282e4689eae..508e718169ccd 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -96,6 +96,21 @@ const char *recoveryTargetName; XLogRecPtr recoveryTargetLSN; int recovery_min_apply_delay = 0; +/* + * If true, when WAL replay on a standby is about to invalidate an otherwise- + * active logical replication slot because a catalog PRUNE_ON_ACCESS record's + * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay + * instead and give an operator a chance to drain (or drop) the slot. + * + * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: + * an archive-only logical-decoding standby cannot feed hot_standby_feedback + * to the primary, so it has no natural way to keep the primary's catalog + * horizon pinned. Without this GUC, any logical slot created on such a + * standby is invalidated the first time replay applies a catalog vacuum + * record whose horizon exceeds the slot's catalog_xmin. + */ +bool recovery_pause_on_logical_slot_conflict = false; + /* options formerly taken from recovery.conf for XLOG streaming */ char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; @@ -4440,6 +4455,10 @@ SetPromoteIsTriggered(void) /* * Check whether a promote request has arrived. */ +/* + * Non-static: MaybePauseOnLogicalSlotConflict needs this to break its wait + * loop on promotion, same as recoveryPausesHere does. + */ bool CheckForStandbyTrigger(void) { diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index de9092fdf5bc9..0659f9d21699e 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,8 +24,11 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/startup.h" #include "replication/slot.h" +#include "storage/condition_variable.h" #include "storage/bufmgr.h" +#include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" @@ -503,8 +506,240 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * reached, e.g. due to using a physical replication slot. */ if (IsLogicalDecodingEnabled() && isCatalogRel) + { + MaybePauseOnLogicalSlotConflict(locator.dbOid, snapshotConflictHorizon); InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); + } +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait on the recovery + * pause condition variable until an operator resumes. + * + * On resume the caller re-falls through to InvalidateObsoleteReplicationSlots: + * if the operator has drained / dropped / advanced the slot, invalidation is + * a no-op; if they chose to resume without acting, the slot is invalidated + * as usual. This matches the recovery_target_action=pause precedent. + * + * The two parameters identify which slots, if any, this prune record can + * conflict with: + * - dboid: logical slots are per-database, so only slots belonging to this + * database can be invalidated by a catalog prune happening here; slots in + * other databases are never affected and must be ignored. + * - snapshotConflictHorizon: the xid threshold carried by the + * PRUNE_ON_ACCESS record. A slot conflicts iff its catalog_xmin + * precedes-or-equals this horizon (i.e. it still needs catalog rows the + * prune is about to remove). + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + int i; + bool would_invalidate = false; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) + return; + + /* + * Scan for a would-be-invalidated slot in the conflicting database. + * + * Skip slots that have not yet reached snapshot-builder consistency + * (effective_catalog_xmin is still InvalidTransactionId). An in-progress + * slot has not produced any output to a consumer, so invalidating it is + * harmless — the caller can retry. Pausing for such a slot would + * deadlock: DecodingContextFindStartpoint would be waiting for replay + * to advance, while replay would be waiting for the slot to be drained. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + Oid slot_db; + TransactionId slot_xmin; + TransactionId slot_effective_xmin; + bool active_logical; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + slot_db = s->data.database; + slot_xmin = s->data.catalog_xmin; + slot_effective_xmin = s->effective_catalog_xmin; + /* + * Skip synced slots (managed by the slot-sync worker per + * sync_replication_slots). Writing their fields from the startup + * process would race with the slot-sync worker's own updates, and + * the operator-facing "drain or drop the slot" recipe in the + * errhint below cannot be applied to a synced slot (ALTER / + * DROP_REPLICATION_SLOT error on synced). + */ + active_logical = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); + SpinLockRelease(&s->mutex); + + if (!active_logical) + continue; + if (slot_db != dboid) + continue; + if (!TransactionIdIsValid(slot_xmin)) + continue; + /* + * Use PrecedesOrEquals (not Precedes) to match the check in + * DetermineSlotInvalidationCause. Otherwise a slot whose + * catalog_xmin was just advanced to exactly conflict_horizon by + * a previous pause-and-advance cycle (our own resume code) will + * NOT trigger a pause here when the next prune record arrives + * with horizon == catalog_xmin, yet WILL still be invalidated + * by the fall-through InvalidateObsoleteReplicationSlots call. + */ + if (TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + { + would_invalidate = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + + if (!would_invalidate) + return; + + ereport(LOG, + (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))), + errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", + snapshotConflictHorizon, dboid), + errhint("Drain, advance, or drop the slot, then execute pg_wal_replay_resume()."))); + + SetRecoveryPause(true); + + while (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED) + { + ProcessStartupProcInterrupts(); + + /* + * If the operator gave up on the slot and triggered a promotion + * instead, bail out of the wait so the startup process can proceed + * with the promotion path. Must use CheckForStandbyTrigger (which + * actually consumes PROMOTE_SIGNAL_FILE), not PromoteIsTriggered + * (which only reads a flag populated by the former). Mirrors the + * same escape in recoveryPausesHere(). + */ + if (CheckForStandbyTrigger()) + { + ConditionVariableCancelSleep(); + return; + } + + /* + * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that + * observers (pg_get_wal_replay_pause_state() / monitoring) see the + * pause as actually taken, not just requested. + */ + ConfirmRecoveryPaused(); + ConditionVariableTimedSleep(&XLogRecoveryCtl->recoveryNotPausedCV, + 1000, WAIT_EVENT_RECOVERY_PAUSE); + } + ConditionVariableCancelSleep(); + + /* + * Operator has resumed. If they drained slot(s) up to (or past) the LSN + * of the about-to-be-replayed conflict record, we trust that the consumer + * downstream has captured everything that needed the pre-conflict catalog + * snapshot. Advance those slots' catalog_xmin past the horizon so the + * subsequent InvalidateObsoleteReplicationSlots() fall-through is a + * no-op. Slots that the operator did NOT drain are left alone and get + * invalidated normally — that is the "I didn't act, just let the slot + * die" path. + * + * "Drained past the conflict LSN" is defined as: the slot's + * confirmed_flush_lsn >= the LSN at which replay has paused, which is + * the current replay position reported by GetXLogReplayRecPtr. + */ + { + XLogRecPtr conflict_lsn = GetXLogReplayRecPtr(NULL); + int j; + + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + for (j = 0; j < max_replication_slots; j++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[j]; + bool advance; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + /* + * Skip synced slots — same reason as in the pause-check scan. + * Writing their fields would race the slot-sync worker. + */ + advance = (s->data.invalidated == RS_INVAL_NONE && + s->data.database == dboid && + !s->data.synced && + s->data.confirmed_flush >= conflict_lsn && + ((TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) || + (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)))); + if (advance) + { + TransactionId new_xmin = snapshotConflictHorizon; + + TransactionIdAdvance(new_xmin); /* strictly > horizon */ + if (TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) + { + s->data.catalog_xmin = new_xmin; + s->effective_catalog_xmin = new_xmin; + } + if (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)) + { + s->data.xmin = new_xmin; + s->effective_xmin = new_xmin; + } + s->just_dirtied = true; + s->dirty = true; + } + SpinLockRelease(&s->mutex); + + if (advance) + ereport(LOG, + (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", + NameStr(s->data.name), snapshotConflictHorizon), + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; operator drained before resuming.", + LSN_FORMAT_ARGS(s->data.confirmed_flush), + LSN_FORMAT_ARGS(conflict_lsn)))); + } + LWLockRelease(ReplicationSlotControlLock); + + /* + * Flush dirty slots to disk immediately — do not defer to the next + * restartpoint. The normal streaming path (LogicalConfirmReceivedLocation + * in logical.c) saves slot state to disk *before* updating the + * in-memory effective_catalog_xmin. We must uphold the same invariant: + * if we crash between the in-memory advance above and the next + * restartpoint, the on-disk catalog_xmin must reflect the advance so + * that vacuum cannot reclaim catalog tuples the slot still needs. + */ + CheckPointReplicationSlots(false); + } } /* diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index afaa058b046c9..52b34443ec60c 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2441,6 +2441,14 @@ max => 'INT_MAX', }, +{ name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', + context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. The operator can then drain or drop the slot and call pg_wal_replay_resume() to continue.', + variable => 'recovery_pause_on_logical_slot_conflict', + boot_val => 'false', +}, + { name => 'recovery_prefetch', type => 'enum', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY', short_desc => 'Prefetch referenced blocks during recovery.', long_desc => 'Look ahead in the WAL to find references to uncached data.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ac38cddaaf9a6..17b2bcc4df8b9 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -403,6 +403,8 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating + # a logical slot on catalog conflict #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 1683ec14a5a7a..58d578ce85373 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -129,6 +129,7 @@ extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl; extern PGDLLIMPORT bool recoveryTargetInclusive; extern PGDLLIMPORT int recoveryTargetAction; extern PGDLLIMPORT int recovery_min_apply_delay; +extern PGDLLIMPORT bool recovery_pause_on_logical_slot_conflict; extern PGDLLIMPORT char *PrimaryConnInfo; extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 6a314c693cde7..03705a8eb8ced 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -75,6 +75,8 @@ extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHo extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator); +extern void MaybePauseOnLogicalSlotConflict(Oid dboid, + TransactionId snapshotConflictHorizon); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9eb8ed114254a..ae794cb5baeb7 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -62,6 +62,7 @@ tests += { 't/051_effective_wal_level.pl', 't/052_checkpoint_segment_missing.pl', 't/053_standby_login_event_trigger.pl', + 't/054_recovery_pause_on_slot_conflict.pl', ], }, } diff --git a/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl new file mode 100644 index 0000000000000..d1a03475e95b2 --- /dev/null +++ b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl @@ -0,0 +1,329 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Exercise the recovery_pause_on_logical_slot_conflict GUC on a standby. +# +# Two-phase flow so the slot is fully consistent BEFORE any catalog- +# prune WAL record is replayed — otherwise slot creation would block +# inside DecodingContextFindStartpoint while replay pauses on the +# prune, and we would deadlock. (Fix #1, bbd5d4e13bc, narrows the +# window but doesn't remove it; keeping the two-phase flow explicit +# makes the test robust.) +# +# Phase 1 — bring up a consistent logical slot on the standby from a +# quiet primary archive: +# * take basebackup +# * pg_log_standby_snapshot() → snapbuild path (a) anchor +# * wait for the snapshot's segment to archive +# * start standby, let replay catch up, create slot (quick — no +# prune records in the archive yet). +# +# Phase 2 — churn the primary's catalog so the standby's replay +# eventually hits a catalog-prune record that would invalidate the +# slot: +# * run CREATE / DROP of transient tables (pg_class churn) +# * run ANALYZE x2 + VACUUM pg_statistic / pg_class (HOT prune on +# catalog relations in db=postgres) +# * wait for those segments to archive +# * orchestrator loop on the standby: when +# pg_get_wal_replay_pause_state() returns paused, drain the slot +# via pg_logical_slot_get_changes, call pg_wal_replay_resume, +# continue. + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +# --------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------- + +# Build the primary, seed the workload table, take a basebackup, and +# produce a "clean" archive: one that contains a standby snapshot but +# no catalog-prune WAL yet. Returns ($node_primary, $backup_name). +sub setup_primary_with_clean_archive +{ + my $node_primary = PostgreSQL::Test::Cluster->new('primary'); + $node_primary->init(allows_streaming => 'logical', has_archiving => 1); + $node_primary->append_conf('postgresql.conf', qq[ +wal_level = logical +archive_mode = on +archive_timeout = 1s +autovacuum = on +autovacuum_naptime = 5s +fsync = off +synchronous_commit = off +]); + $node_primary->start; + + $node_primary->safe_psql('postgres', qq[ + CREATE TABLE events (id serial PRIMARY KEY, payload text); + ALTER TABLE events REPLICA IDENTITY FULL; + INSERT INTO events (payload) VALUES ('seed'); + ]); + + my $backup_name = 'backup1'; + $node_primary->backup($backup_name); + + # Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) + # anchor for snapbuild. + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + + # Force the segment containing that anchor to archive so the standby + # can see it via restore_command. Switch TWICE: first switch closes + # the segment with the snapshot record; second switch gives + # snapbuild the forward WAL it needs to decide the slot is + # consistent. Without the second switch, + # DecodingContextFindStartpoint blocks on 'waiting for WAL to + # become available at seg N+1' — flaky slot creation. + my $phase1_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase1_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; + + return ($node_primary, $backup_name); +} + +# Bring up an archive-only standby from $backup_name on $node_primary +# with recovery_pause_on_logical_slot_conflict set to $guc_value. Waits +# for replay to catch up, then returns the node. +sub create_archive_standby +{ + my ($node_primary, $backup_name, $name, $guc_value) = @_; + + my $standby = PostgreSQL::Test::Cluster->new($name); + $standby->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); + $standby->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = $guc_value +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); + $standby->start; + $standby->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); + + return $standby; +} + +# Churn the primary's catalog enough to emit catalog-prune WAL records, +# then force and wait for those records to reach the archive. +sub run_catalog_churn +{ + my ($node_primary) = @_; + + # Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. + $node_primary->safe_psql('postgres', qq[ + INSERT INTO events (payload) + SELECT 'row-' || g FROM generate_series(1, 3000) g; + ]); + for (my $i = 0; $i < 20; $i++) { + $node_primary->safe_psql('postgres', + "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); + } + # Two ANALYZE calls make first-generation pg_statistic rows dead by + # overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. + $node_primary->safe_psql('postgres', qq[ + ANALYZE events; + ANALYZE events; + VACUUM pg_class; + VACUUM pg_attribute; + VACUUM pg_type; + VACUUM pg_depend; + VACUUM pg_statistic; + ]); + + my $phase2_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase2_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; + + return; +} + +# Orchestrator loop for the GUC-on standby: when replay pauses, drain +# the slot via pg_logical_slot_get_changes and call +# pg_wal_replay_resume(). Exits when replay stops advancing or when +# $deadline_seconds have passed. Returns ($pauses_seen, $total_drained) +# and includes a final drain of anything left on the slot. +sub drain_and_resume_loop +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $pauses_seen = 0; + my $last_replay = ''; + my $stall_ticks = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + my $state = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + my $replay = $standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + + if ($state eq 'paused' || $state eq 'pause requested') { + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + $pauses_seen++; + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $stall_ticks = 0; + } elsif ($replay eq $last_replay) { + $stall_ticks++; + last if $stall_ticks > 10; + } else { + $stall_ticks = 0; + } + + $last_replay = $replay; + usleep(500_000); + } + + my $final = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $final; + + return ($pauses_seen, $total_drained); +} + +# Poll until $standby reports replay as paused, up to ~30 seconds. +# Returns 1 on success, 0 on timeout. +sub wait_for_replay_paused +{ + my ($standby) = @_; + + for (my $i = 0; $i < 60; $i++) { + my $s = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + return 1 if $s eq 'paused'; + usleep(500_000); + } + return 0; +} + +# --------------------------------------------------------------------- +# Main script +# --------------------------------------------------------------------- + +# 1. GUC visibility. +my ($node_primary, $backup_name) = setup_primary_with_clean_archive(); + +my $guc = $node_primary->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); +is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + +# 2. Phase 1: bring up BOTH standbys (GUC-on and GUC-off) while the +# archive still contains only the quiet-moment snapshot — no prune +# records yet. Slot creation reaches SNAPBUILD_CONSISTENT quickly on +# both. Later, when Phase 2 ships the prune records, the two standbys +# diverge: the GUC-on one pauses and drains; the GUC-off one +# invalidates. +my $node_standby = create_archive_standby($node_primary, $backup_name, + 'standby', 'on'); +my $node_standby_off = create_archive_standby($node_primary, $backup_name, + 'standby_off', 'off'); + +$node_standby->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); +]); +$node_standby_off->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot_off', 'test_decoding'); +]); + +my $slot_ready = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot' +]); +is($slot_ready, 'reserved', "slot created cleanly in Phase 1 (state: $slot_ready)"); + +my $off_slot_ready = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off' +]); +is($off_slot_ready, 'reserved', + "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); + +# 3. Phase 2: catalog churn on primary, then wait for archive. +run_catalog_churn($node_primary); + +# 4. Orchestrator loop on the GUC-on standby. +my ($pauses_seen, $total_drained) = + drain_and_resume_loop($node_standby, 't_slot', 60); + +my $slot_state = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 't_slot'; +]); +like($slot_state, qr/^reserved\|/, + "slot survived catalog prune with GUC on (state: $slot_state)"); + +cmp_ok($pauses_seen, '>=', 1, + "at least one pause event was handled ($pauses_seen seen)"); + +cmp_ok($total_drained, '>=', 2000, + "at least 2000 decoded events ($total_drained got)"); + +# 5. Baseline assertion: the GUC-off standby, faced with the exact same +# Phase-2 archive, should invalidate its slot. This confirms the test +# setup actually triggers the conflict AND that GUC-off behavior is +# unchanged from upstream — if this ever starts passing with state +# "reserved", either the test stopped reproducing the trigger or the +# GUC-off path accidentally benefits from our patch. +my $off_state = 'reserved'; +for (my $i = 0; $i < 60; $i++) { + $off_state = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off'; + ]); + last if $off_state eq 'lost'; + usleep(500_000); +} + +is($off_state, 'lost', + "baseline (GUC off): slot invalidates as expected under catalog prune"); + +# 6. Promote-during-pause: bring up a third standby, get it paused by +# the GUC, then call pg_promote() and assert promotion actually +# completes (rather than stalling until someone also runs +# pg_wal_replay_resume). Guards the CheckForStandbyTrigger() escape +# path in the wait loop. +my $node_standby_p = create_archive_standby($node_primary, $backup_name, + 'standby_promote', 'on'); +$node_standby_p->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('promote_slot', 'test_decoding')"); + +# Phase-2 archive is already shipped so a pause will happen within a +# few seconds. +my $paused = wait_for_replay_paused($node_standby_p); +ok($paused, "promote-test standby reached paused state before promotion"); + +# Call pg_promote with a short wait. Without the CheckForStandbyTrigger +# escape in the wait loop, this stalls for the full wait_seconds and +# returns false; with the fix, it returns true in ~1 second. +my $t0 = time(); +my $promoted = $node_standby_p->safe_psql('postgres', + "SELECT pg_promote(wait => true, wait_seconds => 30)"); +my $elapsed = time() - $t0; +is($promoted, 't', "pg_promote returned true while standby was paused by GUC"); +cmp_ok($elapsed, '<', 10, + "pg_promote completed in under 10s (actual: ${elapsed}s)"); + +$node_standby_p->stop; +$node_standby_off->stop; +$node_standby->stop; +$node_primary->stop; + +done_testing(); From 44eceb392066607a497e109eb5f0f8a42e4a658e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 22 Apr 2026 18:05:46 +0000 Subject: [PATCH 3/3] Auto-resume recovery once the logical slot conflict is resolved MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous behavior under recovery_pause_on_logical_slot_conflict required the operator to both drain (or drop / advance) the slot AND call pg_wal_replay_resume() to continue — two steps, even though the first step is the one that matters semantically. That split also meant the feature couldn't underpin a continuous-CDC service without external orchestration to issue the resume. Lift the scan predicate ("does any slot in `dboid` still block this conflict?") out of the initial check into a helper AnySlotStillBlocksConflict(). Call it again every 1s inside the existing wait loop. When it returns false, flip the pause state to NOT_PAUSED and let the loop exit; the existing post-wait advance then bumps catalog_xmin past the horizon on drained slots so the fall-through InvalidateObsoleteReplicationSlots() is a no-op. "No longer blocking" covers every unblock path, not just drain: * drained past the pause LSN (confirmed_flush >= captured conflict_lsn) — the main case * slot dropped (pg_drop_replication_slot) — removed from the scan * slot advanced (pg_replication_slot_advance) — catalog_xmin moves past the horizon * slot invalidated for another reason (e.g. RS_INVAL_WAL_REMOVED from max_slot_wal_keep_size, applied by the checkpointer, which runs even while the startup process is asleep in our wait loop) — data.invalidated != RS_INVAL_NONE, scan skips it Manual pg_wal_replay_resume() still works as the "give up on this slot and let it invalidate" escape hatch, and CheckForStandbyTrigger still breaks the loop for pg_promote(). Capture conflict_lsn once at pause time and reuse it for both the in-wait predicate and the post-wait advance, replacing the redundant second GetXLogReplayRecPtr() call. GUC long_desc, postgresql.conf.sample comment, and the xlogrecovery.c variable-decl comment updated to describe auto-resume. --- src/backend/access/transam/xlogrecovery.c | 5 +- src/backend/storage/ipc/standby.c | 216 +++++++++++------- src/backend/utils/misc/guc_parameters.dat | 2 +- src/backend/utils/misc/postgresql.conf.sample | 4 +- .../t/054_recovery_pause_on_slot_conflict.pl | 120 +++++++++- 5 files changed, 255 insertions(+), 92 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 508e718169ccd..a2b0fd4ef1219 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -100,7 +100,10 @@ int recovery_min_apply_delay = 0; * If true, when WAL replay on a standby is about to invalidate an otherwise- * active logical replication slot because a catalog PRUNE_ON_ACCESS record's * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay - * instead and give an operator a chance to drain (or drop) the slot. + * instead. Replay auto-resumes once the consumer has drained the slot past + * the pause point (or the slot is dropped, advanced, or otherwise no longer + * blocking); pg_wal_replay_resume() also forces continuation. See + * MaybePauseOnLogicalSlotConflict() in standby.c. * * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: * an archive-only logical-decoding standby cannot feed hot_standby_feedback diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 0659f9d21699e..ce467a0748618 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -514,51 +514,37 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, } /* - * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about - * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon - * would cause the invalidation of at least one non-invalidated logical slot - * in the same database, request a recovery pause and wait on the recovery - * pause condition variable until an operator resumes. + * Returns true if at least one non-synced logical slot in `dboid` still + * blocks replay past snapshotConflictHorizon. * - * On resume the caller re-falls through to InvalidateObsoleteReplicationSlots: - * if the operator has drained / dropped / advanced the slot, invalidation is - * a no-op; if they chose to resume without acting, the slot is invalidated - * as usual. This matches the recovery_target_action=pause precedent. + * "Blocks" means: the slot is in use, not invalidated, snapbuild-consistent + * (effective_catalog_xmin is valid — skipping in-progress slots avoids a + * deadlock with DecodingContextFindStartpoint), and its catalog_xmin + * precedes-or-equals the horizon. * - * The two parameters identify which slots, if any, this prune record can - * conflict with: - * - dboid: logical slots are per-database, so only slots belonging to this - * database can be invalidated by a catalog prune happening here; slots in - * other databases are never affected and must be ignored. - * - snapshotConflictHorizon: the xid threshold carried by the - * PRUNE_ON_ACCESS record. A slot conflicts iff its catalog_xmin - * precedes-or-equals this horizon (i.e. it still needs catalog rows the - * prune is about to remove). + * Use PrecedesOrEquals (not Precedes) to match DetermineSlotInvalidationCause. + * Otherwise a slot whose catalog_xmin was just advanced to exactly horizon by + * a previous pause-and-advance cycle fails to re-pause on the next prune + * record with the same horizon, yet would still be invalidated by the + * fall-through InvalidateObsoleteReplicationSlots call. * - * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer - * locks are taken, so pausing here does not deadlock with anything. + * Synced slots are skipped: writing their fields from the startup process + * would race the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT errors + * out on a synced slot so the operator-facing recipe does not apply. + * + * When conflict_lsn is valid (in-wait auto-resume check), slots whose + * confirmed_flush_lsn has reached conflict_lsn are treated as not blocking: + * the consumer has caught up to the pause point and the post-wait advance + * code will bump their catalog_xmin past the horizon. Pass InvalidXLogRecPtr + * for the initial pause-or-not decision (we don't yet have a pause point). */ -void -MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +static bool +AnySlotStillBlocksConflict(Oid dboid, TransactionId snapshotConflictHorizon, + XLogRecPtr conflict_lsn) { int i; - bool would_invalidate = false; - - if (!recovery_pause_on_logical_slot_conflict) - return; - if (!TransactionIdIsValid(snapshotConflictHorizon)) - return; + bool blocking = false; - /* - * Scan for a would-be-invalidated slot in the conflicting database. - * - * Skip slots that have not yet reached snapshot-builder consistency - * (effective_catalog_xmin is still InvalidTransactionId). An in-progress - * slot has not produced any output to a consumer, so invalidating it is - * harmless — the caller can retry. Pausing for such a slot would - * deadlock: DecodingContextFindStartpoint would be waiting for replay - * to advance, while replay would be waiting for the slot to be drained. - */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -566,7 +552,8 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon Oid slot_db; TransactionId slot_xmin; TransactionId slot_effective_xmin; - bool active_logical; + XLogRecPtr slot_confirmed; + bool is_candidate; if (!s->in_use) continue; @@ -575,52 +562,99 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon slot_db = s->data.database; slot_xmin = s->data.catalog_xmin; slot_effective_xmin = s->effective_catalog_xmin; - /* - * Skip synced slots (managed by the slot-sync worker per - * sync_replication_slots). Writing their fields from the startup - * process would race with the slot-sync worker's own updates, and - * the operator-facing "drain or drop the slot" recipe in the - * errhint below cannot be applied to a synced slot (ALTER / - * DROP_REPLICATION_SLOT error on synced). - */ - active_logical = (s->data.invalidated == RS_INVAL_NONE && - slot_db != InvalidOid && - TransactionIdIsValid(slot_effective_xmin) && - !s->data.synced); + slot_confirmed = s->data.confirmed_flush; + is_candidate = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); SpinLockRelease(&s->mutex); - if (!active_logical) + if (!is_candidate) continue; if (slot_db != dboid) continue; if (!TransactionIdIsValid(slot_xmin)) continue; - /* - * Use PrecedesOrEquals (not Precedes) to match the check in - * DetermineSlotInvalidationCause. Otherwise a slot whose - * catalog_xmin was just advanced to exactly conflict_horizon by - * a previous pause-and-advance cycle (our own resume code) will - * NOT trigger a pause here when the next prune record arrives - * with horizon == catalog_xmin, yet WILL still be invalidated - * by the fall-through InvalidateObsoleteReplicationSlots call. - */ - if (TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) - { - would_invalidate = true; - break; - } + if (!TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + continue; + if (conflict_lsn != InvalidXLogRecPtr && + slot_confirmed >= conflict_lsn) + continue; + + blocking = true; + break; } LWLockRelease(ReplicationSlotControlLock); - if (!would_invalidate) + return blocking; +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait until the conflict + * is resolved. + * + * The wait exits in any of: + * - Auto-resume: a periodic re-scan finds no slot still blocking. Any of + * draining past the pause LSN, dropping the slot, pg_replication_slot_ + * advance(), or out-of-band invalidation (e.g. max_slot_wal_keep_size + * applied by the checkpointer, which runs even while startup is paused + * here) will satisfy this. The post-wait advance then bumps catalog_xmin + * on drained slots so the fall-through InvalidateObsoleteReplicationSlots() + * is a no-op. + * - Manual resume: pg_wal_replay_resume() flips the state to NOT_PAUSED. + * Any slot still blocking at that point is invalidated by the + * fall-through — the "give up on this slot" escape hatch. + * - Promote: CheckForStandbyTrigger() consumes PROMOTE_SIGNAL_FILE and we + * return early so the startup process can finish promotion. + * + * The two parameters identify which slots, if any, this prune record can + * conflict with: + * - dboid: logical slots are per-database, so only slots belonging to this + * database can be invalidated by a catalog prune happening here; slots in + * other databases are never affected and must be ignored. + * - snapshotConflictHorizon: the xid threshold carried by the + * PRUNE_ON_ACCESS record. A slot conflicts iff its catalog_xmin + * precedes-or-equals this horizon (i.e. it still needs catalog rows the + * prune is about to remove). + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + XLogRecPtr conflict_lsn; + bool user_requested_pause; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) + return; + + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + InvalidXLogRecPtr)) return; + /* + * Remember whether an operator had already paused recovery (e.g. via + * pg_wal_replay_pause()) before this conflict fired. If so, our + * auto-resume below must not clear that pause out from under them — the + * user's pause wins. + */ + user_requested_pause = (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED); + + conflict_lsn = GetXLogReplayRecPtr(NULL); + ereport(LOG, (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))), + LSN_FORMAT_ARGS(conflict_lsn)), errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", snapshotConflictHorizon, dboid), - errhint("Drain, advance, or drop the slot, then execute pg_wal_replay_resume()."))); + errhint("Recovery will resume automatically once the slot is drained past %X/%X, dropped, advanced, or invalidated for another reason; pg_wal_replay_resume() forces continuation (invalidating any remaining blocking slot).", + LSN_FORMAT_ARGS(conflict_lsn)))); SetRecoveryPause(true); @@ -642,6 +676,29 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon return; } + /* + * Auto-resume: if nothing is still blocking this conflict, clear + * the pause and let the loop condition exit. The post-wait advance + * will bump catalog_xmin on any slot that drained past conflict_lsn + * so the fall-through InvalidateObsoleteReplicationSlots() is a + * no-op. Slots invalidated out of band (dropped, WAL-removed, + * etc.) are simply not in the scan anymore. + */ + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + conflict_lsn)) + { + /* + * Only clear the pause we set ourselves. If the operator had + * already paused recovery before the conflict fired, leave their + * pause in place — auto-resume must not silently override an + * explicit pg_wal_replay_pause(). Either way, exit the + * conflict-wait loop now that nothing is blocking. + */ + if (!user_requested_pause) + SetRecoveryPause(false); + break; + } + /* * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that * observers (pg_get_wal_replay_pause_state() / monitoring) see the @@ -654,21 +711,14 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon ConditionVariableCancelSleep(); /* - * Operator has resumed. If they drained slot(s) up to (or past) the LSN - * of the about-to-be-replayed conflict record, we trust that the consumer - * downstream has captured everything that needed the pre-conflict catalog - * snapshot. Advance those slots' catalog_xmin past the horizon so the - * subsequent InvalidateObsoleteReplicationSlots() fall-through is a - * no-op. Slots that the operator did NOT drain are left alone and get - * invalidated normally — that is the "I didn't act, just let the slot - * die" path. - * - * "Drained past the conflict LSN" is defined as: the slot's - * confirmed_flush_lsn >= the LSN at which replay has paused, which is - * the current replay position reported by GetXLogReplayRecPtr. + * Wait is over. For any slot whose consumer drained up to (or past) + * conflict_lsn, advance catalog_xmin past the horizon so the subsequent + * InvalidateObsoleteReplicationSlots() fall-through is a no-op. Slots + * that did not drain are left alone and get invalidated normally — the + * "I didn't act, just let the slot die" path that runs when an operator + * manually resumed without draining. */ { - XLogRecPtr conflict_lsn = GetXLogReplayRecPtr(NULL); int j; LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); @@ -723,7 +773,7 @@ MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon ereport(LOG, (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", NameStr(s->data.name), snapshotConflictHorizon), - errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; operator drained before resuming.", + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; consumer drained past the pause point.", LSN_FORMAT_ARGS(s->data.confirmed_flush), LSN_FORMAT_ARGS(conflict_lsn)))); } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 52b34443ec60c..709079c399da8 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2444,7 +2444,7 @@ { name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', - long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. The operator can then drain or drop the slot and call pg_wal_replay_resume() to continue.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. Recovery resumes automatically once the slot has been drained past the pause point, dropped, advanced, or invalidated for another reason (e.g. max_slot_wal_keep_size). pg_wal_replay_resume() also forces continuation, invalidating any remaining blocking slot.', variable => 'recovery_pause_on_logical_slot_conflict', boot_val => 'false', }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 17b2bcc4df8b9..414fed447cfe2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -404,7 +404,9 @@ # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery #recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating - # a logical slot on catalog conflict + # a logical slot on catalog conflict; + # auto-resumes once the slot is drained, + # dropped, or otherwise unblocks #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl index d1a03475e95b2..b53bf97694e3a 100644 --- a/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl +++ b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl @@ -217,6 +217,49 @@ sub wait_for_replay_paused return 0; } +# Models an operator who issued an explicit pg_wal_replay_pause() that +# must survive the GUC's auto-resume. On entry replay is parked at a +# pre-conflict LSN with the operator pause already in effect. Each tick we +# nudge replay forward (pg_wal_replay_resume()) and then immediately +# re-assert the operator pause (pg_wal_replay_pause()), so that when the +# startup process reaches the catalog-prune record the operator pause is +# already pending — i.e. GetRecoveryPauseState() != RECOVERY_NOT_PAUSED +# at the moment MaybePauseOnLogicalSlotConflict() captures it. We then +# drain the slot so the GUC's auto-resume re-scan finds nothing blocking. +# With the fix the operator's pause is preserved; without it the +# unconditional SetRecoveryPause(false) would clear it. +# +# Returns the total number of changes drained. +sub drain_holding_user_pause +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + # Drain whatever the slot currently holds. + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + + # Stop once the slot is fully drained and replay has advanced past + # the conflict (nothing left to decode and no longer pause-looping + # on the GUC). A short tail of zero-change drains confirms we are + # done. + last if $got == 0 && $total_drained > 0; + + # Nudge replay forward, then immediately re-pause so the operator + # pause is pending again when the next conflict record is applied. + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $standby->safe_psql('postgres', "SELECT pg_wal_replay_pause()"); + + usleep(500_000); + } + + return $total_drained; +} + # --------------------------------------------------------------------- # Main script # --------------------------------------------------------------------- @@ -228,16 +271,19 @@ sub wait_for_replay_paused "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); -# 2. Phase 1: bring up BOTH standbys (GUC-on and GUC-off) while the -# archive still contains only the quiet-moment snapshot — no prune -# records yet. Slot creation reaches SNAPBUILD_CONSISTENT quickly on -# both. Later, when Phase 2 ships the prune records, the two standbys -# diverge: the GUC-on one pauses and drains; the GUC-off one -# invalidates. +# 2. Phase 1: bring up the standbys (GUC-on, GUC-off, and a second +# GUC-on "user-pause" standby) while the archive still contains only the +# quiet-moment snapshot — no prune records yet. Slot creation reaches +# SNAPBUILD_CONSISTENT quickly on all of them. Later, when Phase 2 ships +# the prune records, the standbys diverge: the GUC-on ones pause and +# drain; the GUC-off one invalidates. The user-pause standby additionally +# checks that an operator's explicit pause survives the GUC auto-resume. my $node_standby = create_archive_standby($node_primary, $backup_name, 'standby', 'on'); my $node_standby_off = create_archive_standby($node_primary, $backup_name, 'standby_off', 'off'); +my $node_standby_up = create_archive_standby($node_primary, $backup_name, + 'standby_userpause', 'on'); $node_standby->safe_psql('postgres', qq[ SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); @@ -245,6 +291,9 @@ sub wait_for_replay_paused $node_standby_off->safe_psql('postgres', qq[ SELECT pg_create_logical_replication_slot('t_slot_off', 'test_decoding'); ]); +$node_standby_up->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('up_slot', 'test_decoding'); +]); my $slot_ready = $node_standby->safe_psql('postgres', qq[ SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot' @@ -257,6 +306,21 @@ sub wait_for_replay_paused is($off_slot_ready, 'reserved', "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); +my $up_slot_ready = $node_standby_up->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'up_slot' +]); +is($up_slot_ready, 'reserved', + "user-pause slot created cleanly in Phase 1 (state: $up_slot_ready)"); + +# Operator pauses recovery on the user-pause standby NOW, while the +# archive still only holds the clean Phase-1 snapshot and the catalog- +# prune conflict has not been replayed yet. This parks replay at a +# pre-conflict LSN with an explicit operator pause in effect — the exact +# precondition for the user-pause-clobber bug. +$node_standby_up->safe_psql('postgres', "SELECT pg_wal_replay_pause()"); +ok(wait_for_replay_paused($node_standby_up), + "user-pause standby parks on operator pg_wal_replay_pause() before conflict"); + # 3. Phase 2: catalog churn on primary, then wait for archive. run_catalog_churn($node_primary); @@ -322,6 +386,50 @@ sub wait_for_replay_paused "pg_promote completed in under 10s (actual: ${elapsed}s)"); $node_standby_p->stop; + +# 7. User-pause survives auto-resume. The operator paused recovery with +# pg_wal_replay_pause() before the conflict record was replayed (done in +# section 2). drain_holding_user_pause nudges replay into the conflict +# while keeping that operator pause pending, then drains the slot so the +# GUC's auto-resume re-scan finds nothing blocking. The fix in +# MaybePauseOnLogicalSlotConflict() must then leave the operator's pause +# in place rather than clearing it with an unconditional +# SetRecoveryPause(false), so: +# - with the fix: replay stays 'paused' after the conflict resolves; +# - without the fix: auto-resume clears the pause and replay proceeds. +my $up_drained = drain_holding_user_pause($node_standby_up, 'up_slot', 60); + +cmp_ok($up_drained, '>=', 2000, + "user-pause standby drained the slot under operator pause ($up_drained got)"); + +# The slot must have survived (drained, not invalidated) just like the +# plain GUC-on standby. +my $up_slot_state = $node_standby_up->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 'up_slot'; +]); +like($up_slot_state, qr/^reserved\|/, + "user-pause slot survived catalog prune (state: $up_slot_state)"); + +# The crux: recovery is STILL paused because the operator's pause was not +# cleared by the GUC's auto-resume. +my $up_pause_state = $node_standby_up->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); +is($up_pause_state, 'paused', + "operator pause survived GUC auto-resume (state: $up_pause_state)"); + +# Now the operator resumes and replay must proceed past the pause. +my $up_lsn_before = $node_standby_up->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); +$node_standby_up->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); +$node_standby_up->poll_query_until('postgres', + "SELECT pg_get_wal_replay_pause_state() = 'not paused'") + or die "replay did not leave paused state after operator resume"; +ok($node_standby_up->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() >= '$up_lsn_before'::pg_lsn"), + "replay proceeds after operator pg_wal_replay_resume()"); + +$node_standby_up->stop; $node_standby_off->stop; $node_standby->stop; $node_primary->stop;