diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 73b78a83fa744..a2b0fd4ef1219 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -96,6 +96,24 @@ const char *recoveryTargetName; XLogRecPtr recoveryTargetLSN; int recovery_min_apply_delay = 0; +/* + * If true, when WAL replay on a standby is about to invalidate an otherwise- + * active logical replication slot because a catalog PRUNE_ON_ACCESS record's + * snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay + * instead. Replay auto-resumes once the consumer has drained the slot past + * the pause point (or the slot is dropped, advanced, or otherwise no longer + * blocking); pg_wal_replay_resume() also forces continuation. See + * MaybePauseOnLogicalSlotConflict() in standby.c. + * + * Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4: + * an archive-only logical-decoding standby cannot feed hot_standby_feedback + * to the primary, so it has no natural way to keep the primary's catalog + * horizon pinned. Without this GUC, any logical slot created on such a + * standby is invalidated the first time replay applies a catalog vacuum + * record whose horizon exceeds the slot's catalog_xmin. + */ +bool recovery_pause_on_logical_slot_conflict = false; + /* options formerly taken from recovery.conf for XLOG streaming */ char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; @@ -363,7 +381,8 @@ static bool recoveryStopsAfter(XLogReaderState *record); static char *getRecoveryStopReason(void); static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); -static void ConfirmRecoveryPaused(void); +/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */ +void ConfirmRecoveryPaused(void); static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, @@ -386,7 +405,8 @@ static int XLogFileRead(XLogSegNo segno, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source); -static bool CheckForStandbyTrigger(void); +/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */ +bool CheckForStandbyTrigger(void); static void SetPromoteIsTriggered(void); static bool HotStandbyActiveInReplay(void); @@ -3083,7 +3103,7 @@ SetRecoveryPause(bool recoveryPause) * Confirm the recovery pause by setting the recovery pause state to * RECOVERY_PAUSED. */ -static void +void ConfirmRecoveryPaused(void) { /* If recovery pause is requested then set it paused */ @@ -4438,7 +4458,11 @@ SetPromoteIsTriggered(void) /* * Check whether a promote request has arrived. */ -static bool +/* + * Non-static: MaybePauseOnLogicalSlotConflict needs this to break its wait + * loop on promotion, same as recoveryPausesHere does. + */ +bool CheckForStandbyTrigger(void) { if (LocalPromoteIsTriggered) diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index de9092fdf5bc9..ce467a0748618 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,8 +24,11 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/startup.h" #include "replication/slot.h" +#include "storage/condition_variable.h" #include "storage/bufmgr.h" +#include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" @@ -503,8 +506,290 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * reached, e.g. due to using a physical replication slot. */ if (IsLogicalDecodingEnabled() && isCatalogRel) + { + MaybePauseOnLogicalSlotConflict(locator.dbOid, snapshotConflictHorizon); InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); + } +} + +/* + * Returns true if at least one non-synced logical slot in `dboid` still + * blocks replay past snapshotConflictHorizon. + * + * "Blocks" means: the slot is in use, not invalidated, snapbuild-consistent + * (effective_catalog_xmin is valid — skipping in-progress slots avoids a + * deadlock with DecodingContextFindStartpoint), and its catalog_xmin + * precedes-or-equals the horizon. + * + * Use PrecedesOrEquals (not Precedes) to match DetermineSlotInvalidationCause. + * Otherwise a slot whose catalog_xmin was just advanced to exactly horizon by + * a previous pause-and-advance cycle fails to re-pause on the next prune + * record with the same horizon, yet would still be invalidated by the + * fall-through InvalidateObsoleteReplicationSlots call. + * + * Synced slots are skipped: writing their fields from the startup process + * would race the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT errors + * out on a synced slot so the operator-facing recipe does not apply. + * + * When conflict_lsn is valid (in-wait auto-resume check), slots whose + * confirmed_flush_lsn has reached conflict_lsn are treated as not blocking: + * the consumer has caught up to the pause point and the post-wait advance + * code will bump their catalog_xmin past the horizon. Pass InvalidXLogRecPtr + * for the initial pause-or-not decision (we don't yet have a pause point). + */ +static bool +AnySlotStillBlocksConflict(Oid dboid, TransactionId snapshotConflictHorizon, + XLogRecPtr conflict_lsn) +{ + int i; + bool blocking = false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + Oid slot_db; + TransactionId slot_xmin; + TransactionId slot_effective_xmin; + XLogRecPtr slot_confirmed; + bool is_candidate; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + slot_db = s->data.database; + slot_xmin = s->data.catalog_xmin; + slot_effective_xmin = s->effective_catalog_xmin; + slot_confirmed = s->data.confirmed_flush; + is_candidate = (s->data.invalidated == RS_INVAL_NONE && + slot_db != InvalidOid && + TransactionIdIsValid(slot_effective_xmin) && + !s->data.synced); + SpinLockRelease(&s->mutex); + + if (!is_candidate) + continue; + if (slot_db != dboid) + continue; + if (!TransactionIdIsValid(slot_xmin)) + continue; + if (!TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon)) + continue; + if (conflict_lsn != InvalidXLogRecPtr && + slot_confirmed >= conflict_lsn) + continue; + + blocking = true; + break; + } + LWLockRelease(ReplicationSlotControlLock); + + return blocking; +} + +/* + * If recovery_pause_on_logical_slot_conflict is enabled, and replay is about + * to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon + * would cause the invalidation of at least one non-invalidated logical slot + * in the same database, request a recovery pause and wait until the conflict + * is resolved. + * + * The wait exits in any of: + * - Auto-resume: a periodic re-scan finds no slot still blocking. Any of + * draining past the pause LSN, dropping the slot, pg_replication_slot_ + * advance(), or out-of-band invalidation (e.g. max_slot_wal_keep_size + * applied by the checkpointer, which runs even while startup is paused + * here) will satisfy this. The post-wait advance then bumps catalog_xmin + * on drained slots so the fall-through InvalidateObsoleteReplicationSlots() + * is a no-op. + * - Manual resume: pg_wal_replay_resume() flips the state to NOT_PAUSED. + * Any slot still blocking at that point is invalidated by the + * fall-through — the "give up on this slot" escape hatch. + * - Promote: CheckForStandbyTrigger() consumes PROMOTE_SIGNAL_FILE and we + * return early so the startup process can finish promotion. + * + * The two parameters identify which slots, if any, this prune record can + * conflict with: + * - dboid: logical slots are per-database, so only slots belonging to this + * database can be invalidated by a catalog prune happening here; slots in + * other databases are never affected and must be ignored. + * - snapshotConflictHorizon: the xid threshold carried by the + * PRUNE_ON_ACCESS record. A slot conflicts iff its catalog_xmin + * precedes-or-equals this horizon (i.e. it still needs catalog rows the + * prune is about to remove). + * + * Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer + * locks are taken, so pausing here does not deadlock with anything. + */ +void +MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon) +{ + XLogRecPtr conflict_lsn; + bool user_requested_pause; + + if (!recovery_pause_on_logical_slot_conflict) + return; + if (!TransactionIdIsValid(snapshotConflictHorizon)) + return; + + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + InvalidXLogRecPtr)) + return; + + /* + * Remember whether an operator had already paused recovery (e.g. via + * pg_wal_replay_pause()) before this conflict fired. If so, our + * auto-resume below must not clear that pause out from under them — the + * user's pause wins. + */ + user_requested_pause = (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED); + + conflict_lsn = GetXLogReplayRecPtr(NULL); + + ereport(LOG, + (errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot", + LSN_FORMAT_ARGS(conflict_lsn)), + errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.", + snapshotConflictHorizon, dboid), + errhint("Recovery will resume automatically once the slot is drained past %X/%X, dropped, advanced, or invalidated for another reason; pg_wal_replay_resume() forces continuation (invalidating any remaining blocking slot).", + LSN_FORMAT_ARGS(conflict_lsn)))); + + SetRecoveryPause(true); + + while (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED) + { + ProcessStartupProcInterrupts(); + + /* + * If the operator gave up on the slot and triggered a promotion + * instead, bail out of the wait so the startup process can proceed + * with the promotion path. Must use CheckForStandbyTrigger (which + * actually consumes PROMOTE_SIGNAL_FILE), not PromoteIsTriggered + * (which only reads a flag populated by the former). Mirrors the + * same escape in recoveryPausesHere(). + */ + if (CheckForStandbyTrigger()) + { + ConditionVariableCancelSleep(); + return; + } + + /* + * Auto-resume: if nothing is still blocking this conflict, clear + * the pause and let the loop condition exit. The post-wait advance + * will bump catalog_xmin on any slot that drained past conflict_lsn + * so the fall-through InvalidateObsoleteReplicationSlots() is a + * no-op. Slots invalidated out of band (dropped, WAL-removed, + * etc.) are simply not in the scan anymore. + */ + if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon, + conflict_lsn)) + { + /* + * Only clear the pause we set ourselves. If the operator had + * already paused recovery before the conflict fired, leave their + * pause in place — auto-resume must not silently override an + * explicit pg_wal_replay_pause(). Either way, exit the + * conflict-wait loop now that nothing is blocking. + */ + if (!user_requested_pause) + SetRecoveryPause(false); + break; + } + + /* + * Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that + * observers (pg_get_wal_replay_pause_state() / monitoring) see the + * pause as actually taken, not just requested. + */ + ConfirmRecoveryPaused(); + ConditionVariableTimedSleep(&XLogRecoveryCtl->recoveryNotPausedCV, + 1000, WAIT_EVENT_RECOVERY_PAUSE); + } + ConditionVariableCancelSleep(); + + /* + * Wait is over. For any slot whose consumer drained up to (or past) + * conflict_lsn, advance catalog_xmin past the horizon so the subsequent + * InvalidateObsoleteReplicationSlots() fall-through is a no-op. Slots + * that did not drain are left alone and get invalidated normally — the + * "I didn't act, just let the slot die" path that runs when an operator + * manually resumed without draining. + */ + { + int j; + + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + for (j = 0; j < max_replication_slots; j++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[j]; + bool advance; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + /* + * Skip synced slots — same reason as in the pause-check scan. + * Writing their fields would race the slot-sync worker. + */ + advance = (s->data.invalidated == RS_INVAL_NONE && + s->data.database == dboid && + !s->data.synced && + s->data.confirmed_flush >= conflict_lsn && + ((TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) || + (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)))); + if (advance) + { + TransactionId new_xmin = snapshotConflictHorizon; + + TransactionIdAdvance(new_xmin); /* strictly > horizon */ + if (TransactionIdIsValid(s->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(s->data.catalog_xmin, + snapshotConflictHorizon)) + { + s->data.catalog_xmin = new_xmin; + s->effective_catalog_xmin = new_xmin; + } + if (TransactionIdIsValid(s->data.xmin) && + TransactionIdPrecedesOrEquals(s->data.xmin, + snapshotConflictHorizon)) + { + s->data.xmin = new_xmin; + s->effective_xmin = new_xmin; + } + s->just_dirtied = true; + s->dirty = true; + } + SpinLockRelease(&s->mutex); + + if (advance) + ereport(LOG, + (errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u", + NameStr(s->data.name), snapshotConflictHorizon), + errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; consumer drained past the pause point.", + LSN_FORMAT_ARGS(s->data.confirmed_flush), + LSN_FORMAT_ARGS(conflict_lsn)))); + } + LWLockRelease(ReplicationSlotControlLock); + + /* + * Flush dirty slots to disk immediately — do not defer to the next + * restartpoint. The normal streaming path (LogicalConfirmReceivedLocation + * in logical.c) saves slot state to disk *before* updating the + * in-memory effective_catalog_xmin. We must uphold the same invariant: + * if we crash between the in-memory advance above and the next + * restartpoint, the on-disk catalog_xmin must reflect the advance so + * that vacuum cannot reclaim catalog tuples the slot still needs. + */ + CheckPointReplicationSlots(false); + } } /* diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index afaa058b046c9..709079c399da8 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2441,6 +2441,14 @@ max => 'INT_MAX', }, +{ name => 'recovery_pause_on_logical_slot_conflict', type => 'bool', + context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.', + long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. Recovery resumes automatically once the slot has been drained past the pause point, dropped, advanced, or invalidated for another reason (e.g. max_slot_wal_keep_size). pg_wal_replay_resume() also forces continuation, invalidating any remaining blocking slot.', + variable => 'recovery_pause_on_logical_slot_conflict', + boot_val => 'false', +}, + { name => 'recovery_prefetch', type => 'enum', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY', short_desc => 'Prefetch referenced blocks during recovery.', long_desc => 'Look ahead in the WAL to find references to uncached data.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ac38cddaaf9a6..414fed447cfe2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -403,6 +403,10 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating + # a logical slot on catalog conflict; + # auto-resumes once the slot is drained, + # dropped, or otherwise unblocks #sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index ba7750dca0b45..58d578ce85373 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -129,6 +129,7 @@ extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl; extern PGDLLIMPORT bool recoveryTargetInclusive; extern PGDLLIMPORT int recoveryTargetAction; extern PGDLLIMPORT int recovery_min_apply_delay; +extern PGDLLIMPORT bool recovery_pause_on_logical_slot_conflict; extern PGDLLIMPORT char *PrimaryConnInfo; extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; @@ -213,6 +214,8 @@ extern bool HotStandbyActive(void); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern RecoveryPauseState GetRecoveryPauseState(void); extern void SetRecoveryPause(bool recoveryPause); +extern void ConfirmRecoveryPaused(void); +extern bool CheckForStandbyTrigger(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 6a314c693cde7..03705a8eb8ced 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -75,6 +75,8 @@ extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHo extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, bool isCatalogRel, RelFileLocator locator); +extern void MaybePauseOnLogicalSlotConflict(Oid dboid, + TransactionId snapshotConflictHorizon); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9eb8ed114254a..ae794cb5baeb7 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -62,6 +62,7 @@ tests += { 't/051_effective_wal_level.pl', 't/052_checkpoint_segment_missing.pl', 't/053_standby_login_event_trigger.pl', + 't/054_recovery_pause_on_slot_conflict.pl', ], }, } diff --git a/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl new file mode 100644 index 0000000000000..b53bf97694e3a --- /dev/null +++ b/src/test/recovery/t/054_recovery_pause_on_slot_conflict.pl @@ -0,0 +1,437 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Exercise the recovery_pause_on_logical_slot_conflict GUC on a standby. +# +# Two-phase flow so the slot is fully consistent BEFORE any catalog- +# prune WAL record is replayed — otherwise slot creation would block +# inside DecodingContextFindStartpoint while replay pauses on the +# prune, and we would deadlock. (Fix #1, bbd5d4e13bc, narrows the +# window but doesn't remove it; keeping the two-phase flow explicit +# makes the test robust.) +# +# Phase 1 — bring up a consistent logical slot on the standby from a +# quiet primary archive: +# * take basebackup +# * pg_log_standby_snapshot() → snapbuild path (a) anchor +# * wait for the snapshot's segment to archive +# * start standby, let replay catch up, create slot (quick — no +# prune records in the archive yet). +# +# Phase 2 — churn the primary's catalog so the standby's replay +# eventually hits a catalog-prune record that would invalidate the +# slot: +# * run CREATE / DROP of transient tables (pg_class churn) +# * run ANALYZE x2 + VACUUM pg_statistic / pg_class (HOT prune on +# catalog relations in db=postgres) +# * wait for those segments to archive +# * orchestrator loop on the standby: when +# pg_get_wal_replay_pause_state() returns paused, drain the slot +# via pg_logical_slot_get_changes, call pg_wal_replay_resume, +# continue. + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +# --------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------- + +# Build the primary, seed the workload table, take a basebackup, and +# produce a "clean" archive: one that contains a standby snapshot but +# no catalog-prune WAL yet. Returns ($node_primary, $backup_name). +sub setup_primary_with_clean_archive +{ + my $node_primary = PostgreSQL::Test::Cluster->new('primary'); + $node_primary->init(allows_streaming => 'logical', has_archiving => 1); + $node_primary->append_conf('postgresql.conf', qq[ +wal_level = logical +archive_mode = on +archive_timeout = 1s +autovacuum = on +autovacuum_naptime = 5s +fsync = off +synchronous_commit = off +]); + $node_primary->start; + + $node_primary->safe_psql('postgres', qq[ + CREATE TABLE events (id serial PRIMARY KEY, payload text); + ALTER TABLE events REPLICA IDENTITY FULL; + INSERT INTO events (payload) VALUES ('seed'); + ]); + + my $backup_name = 'backup1'; + $node_primary->backup($backup_name); + + # Quiet-moment RUNNING_XACTS in post-backup WAL — provides path (a) + # anchor for snapbuild. + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + + # Force the segment containing that anchor to archive so the standby + # can see it via restore_command. Switch TWICE: first switch closes + # the segment with the snapshot record; second switch gives + # snapbuild the forward WAL it needs to decide the slot is + # consistent. Without the second switch, + # DecodingContextFindStartpoint blocks on 'waiting for WAL to + # become available at seg N+1' — flaky slot creation. + my $phase1_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase1_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-1 segment $phase1_seg to archive"; + + return ($node_primary, $backup_name); +} + +# Bring up an archive-only standby from $backup_name on $node_primary +# with recovery_pause_on_logical_slot_conflict set to $guc_value. Waits +# for replay to catch up, then returns the node. +sub create_archive_standby +{ + my ($node_primary, $backup_name, $name, $guc_value) = @_; + + my $standby = PostgreSQL::Test::Cluster->new($name); + $standby->init_from_backup($node_primary, $backup_name, + has_streaming => 0, has_restoring => 1); + $standby->append_conf('postgresql.conf', qq[ +hot_standby = on +recovery_pause_on_logical_slot_conflict = $guc_value +wal_level = logical +max_standby_archive_delay = -1 +max_standby_streaming_delay = -1 +]); + $standby->start; + $standby->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() IS NOT NULL", 't'); + + return $standby; +} + +# Churn the primary's catalog enough to emit catalog-prune WAL records, +# then force and wait for those records to reach the archive. +sub run_catalog_churn +{ + my ($node_primary) = @_; + + # Transient tables exercise pg_class / pg_attribute / pg_type / pg_depend. + $node_primary->safe_psql('postgres', qq[ + INSERT INTO events (payload) + SELECT 'row-' || g FROM generate_series(1, 3000) g; + ]); + for (my $i = 0; $i < 20; $i++) { + $node_primary->safe_psql('postgres', + "CREATE TABLE churn_$i (id int, payload text); DROP TABLE churn_$i;"); + } + # Two ANALYZE calls make first-generation pg_statistic rows dead by + # overwriting them; VACUUM then emits Heap2/PRUNE_ON_ACCESS. + $node_primary->safe_psql('postgres', qq[ + ANALYZE events; + ANALYZE events; + VACUUM pg_class; + VACUUM pg_attribute; + VACUUM pg_type; + VACUUM pg_depend; + VACUUM pg_statistic; + ]); + + my $phase2_seg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(pg_current_wal_lsn())"); + $node_primary->safe_psql('postgres', "SELECT pg_switch_wal();"); + $node_primary->poll_query_until('postgres', qq[ + SELECT last_archived_wal IS NOT NULL + AND last_archived_wal >= '$phase2_seg' + FROM pg_stat_archiver + ]) or die "Timed out waiting for phase-2 segment $phase2_seg to archive"; + + return; +} + +# Orchestrator loop for the GUC-on standby: when replay pauses, drain +# the slot via pg_logical_slot_get_changes and call +# pg_wal_replay_resume(). Exits when replay stops advancing or when +# $deadline_seconds have passed. Returns ($pauses_seen, $total_drained) +# and includes a final drain of anything left on the slot. +sub drain_and_resume_loop +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $pauses_seen = 0; + my $last_replay = ''; + my $stall_ticks = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + my $state = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + my $replay = $standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + + if ($state eq 'paused' || $state eq 'pause requested') { + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + $pauses_seen++; + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $stall_ticks = 0; + } elsif ($replay eq $last_replay) { + $stall_ticks++; + last if $stall_ticks > 10; + } else { + $stall_ticks = 0; + } + + $last_replay = $replay; + usleep(500_000); + } + + my $final = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $final; + + return ($pauses_seen, $total_drained); +} + +# Poll until $standby reports replay as paused, up to ~30 seconds. +# Returns 1 on success, 0 on timeout. +sub wait_for_replay_paused +{ + my ($standby) = @_; + + for (my $i = 0; $i < 60; $i++) { + my $s = $standby->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); + return 1 if $s eq 'paused'; + usleep(500_000); + } + return 0; +} + +# Models an operator who issued an explicit pg_wal_replay_pause() that +# must survive the GUC's auto-resume. On entry replay is parked at a +# pre-conflict LSN with the operator pause already in effect. Each tick we +# nudge replay forward (pg_wal_replay_resume()) and then immediately +# re-assert the operator pause (pg_wal_replay_pause()), so that when the +# startup process reaches the catalog-prune record the operator pause is +# already pending — i.e. GetRecoveryPauseState() != RECOVERY_NOT_PAUSED +# at the moment MaybePauseOnLogicalSlotConflict() captures it. We then +# drain the slot so the GUC's auto-resume re-scan finds nothing blocking. +# With the fix the operator's pause is preserved; without it the +# unconditional SetRecoveryPause(false) would clear it. +# +# Returns the total number of changes drained. +sub drain_holding_user_pause +{ + my ($standby, $slot_name, $deadline_seconds) = @_; + + my $total_drained = 0; + my $deadline = time() + $deadline_seconds; + + while (time() < $deadline) { + # Drain whatever the slot currently holds. + my $got = $standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL)"); + $total_drained += $got; + + # Stop once the slot is fully drained and replay has advanced past + # the conflict (nothing left to decode and no longer pause-looping + # on the GUC). A short tail of zero-change drains confirms we are + # done. + last if $got == 0 && $total_drained > 0; + + # Nudge replay forward, then immediately re-pause so the operator + # pause is pending again when the next conflict record is applied. + $standby->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); + $standby->safe_psql('postgres', "SELECT pg_wal_replay_pause()"); + + usleep(500_000); + } + + return $total_drained; +} + +# --------------------------------------------------------------------- +# Main script +# --------------------------------------------------------------------- + +# 1. GUC visibility. +my ($node_primary, $backup_name) = setup_primary_with_clean_archive(); + +my $guc = $node_primary->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_settings WHERE name = 'recovery_pause_on_logical_slot_conflict'"); +is($guc, '1', 'recovery_pause_on_logical_slot_conflict GUC is registered'); + +# 2. Phase 1: bring up the standbys (GUC-on, GUC-off, and a second +# GUC-on "user-pause" standby) while the archive still contains only the +# quiet-moment snapshot — no prune records yet. Slot creation reaches +# SNAPBUILD_CONSISTENT quickly on all of them. Later, when Phase 2 ships +# the prune records, the standbys diverge: the GUC-on ones pause and +# drain; the GUC-off one invalidates. The user-pause standby additionally +# checks that an operator's explicit pause survives the GUC auto-resume. +my $node_standby = create_archive_standby($node_primary, $backup_name, + 'standby', 'on'); +my $node_standby_off = create_archive_standby($node_primary, $backup_name, + 'standby_off', 'off'); +my $node_standby_up = create_archive_standby($node_primary, $backup_name, + 'standby_userpause', 'on'); + +$node_standby->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot', 'test_decoding'); +]); +$node_standby_off->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('t_slot_off', 'test_decoding'); +]); +$node_standby_up->safe_psql('postgres', qq[ + SELECT pg_create_logical_replication_slot('up_slot', 'test_decoding'); +]); + +my $slot_ready = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot' +]); +is($slot_ready, 'reserved', "slot created cleanly in Phase 1 (state: $slot_ready)"); + +my $off_slot_ready = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off' +]); +is($off_slot_ready, 'reserved', + "baseline slot created cleanly in Phase 1 (state: $off_slot_ready)"); + +my $up_slot_ready = $node_standby_up->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'up_slot' +]); +is($up_slot_ready, 'reserved', + "user-pause slot created cleanly in Phase 1 (state: $up_slot_ready)"); + +# Operator pauses recovery on the user-pause standby NOW, while the +# archive still only holds the clean Phase-1 snapshot and the catalog- +# prune conflict has not been replayed yet. This parks replay at a +# pre-conflict LSN with an explicit operator pause in effect — the exact +# precondition for the user-pause-clobber bug. +$node_standby_up->safe_psql('postgres', "SELECT pg_wal_replay_pause()"); +ok(wait_for_replay_paused($node_standby_up), + "user-pause standby parks on operator pg_wal_replay_pause() before conflict"); + +# 3. Phase 2: catalog churn on primary, then wait for archive. +run_catalog_churn($node_primary); + +# 4. Orchestrator loop on the GUC-on standby. +my ($pauses_seen, $total_drained) = + drain_and_resume_loop($node_standby, 't_slot', 60); + +my $slot_state = $node_standby->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 't_slot'; +]); +like($slot_state, qr/^reserved\|/, + "slot survived catalog prune with GUC on (state: $slot_state)"); + +cmp_ok($pauses_seen, '>=', 1, + "at least one pause event was handled ($pauses_seen seen)"); + +cmp_ok($total_drained, '>=', 2000, + "at least 2000 decoded events ($total_drained got)"); + +# 5. Baseline assertion: the GUC-off standby, faced with the exact same +# Phase-2 archive, should invalidate its slot. This confirms the test +# setup actually triggers the conflict AND that GUC-off behavior is +# unchanged from upstream — if this ever starts passing with state +# "reserved", either the test stopped reproducing the trigger or the +# GUC-off path accidentally benefits from our patch. +my $off_state = 'reserved'; +for (my $i = 0; $i < 60; $i++) { + $off_state = $node_standby_off->safe_psql('postgres', qq[ + SELECT wal_status FROM pg_replication_slots WHERE slot_name = 't_slot_off'; + ]); + last if $off_state eq 'lost'; + usleep(500_000); +} + +is($off_state, 'lost', + "baseline (GUC off): slot invalidates as expected under catalog prune"); + +# 6. Promote-during-pause: bring up a third standby, get it paused by +# the GUC, then call pg_promote() and assert promotion actually +# completes (rather than stalling until someone also runs +# pg_wal_replay_resume). Guards the CheckForStandbyTrigger() escape +# path in the wait loop. +my $node_standby_p = create_archive_standby($node_primary, $backup_name, + 'standby_promote', 'on'); +$node_standby_p->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('promote_slot', 'test_decoding')"); + +# Phase-2 archive is already shipped so a pause will happen within a +# few seconds. +my $paused = wait_for_replay_paused($node_standby_p); +ok($paused, "promote-test standby reached paused state before promotion"); + +# Call pg_promote with a short wait. Without the CheckForStandbyTrigger +# escape in the wait loop, this stalls for the full wait_seconds and +# returns false; with the fix, it returns true in ~1 second. +my $t0 = time(); +my $promoted = $node_standby_p->safe_psql('postgres', + "SELECT pg_promote(wait => true, wait_seconds => 30)"); +my $elapsed = time() - $t0; +is($promoted, 't', "pg_promote returned true while standby was paused by GUC"); +cmp_ok($elapsed, '<', 10, + "pg_promote completed in under 10s (actual: ${elapsed}s)"); + +$node_standby_p->stop; + +# 7. User-pause survives auto-resume. The operator paused recovery with +# pg_wal_replay_pause() before the conflict record was replayed (done in +# section 2). drain_holding_user_pause nudges replay into the conflict +# while keeping that operator pause pending, then drains the slot so the +# GUC's auto-resume re-scan finds nothing blocking. The fix in +# MaybePauseOnLogicalSlotConflict() must then leave the operator's pause +# in place rather than clearing it with an unconditional +# SetRecoveryPause(false), so: +# - with the fix: replay stays 'paused' after the conflict resolves; +# - without the fix: auto-resume clears the pause and replay proceeds. +my $up_drained = drain_holding_user_pause($node_standby_up, 'up_slot', 60); + +cmp_ok($up_drained, '>=', 2000, + "user-pause standby drained the slot under operator pause ($up_drained got)"); + +# The slot must have survived (drained, not invalidated) just like the +# plain GUC-on standby. +my $up_slot_state = $node_standby_up->safe_psql('postgres', qq[ + SELECT wal_status || '|' || COALESCE(invalidation_reason, '') + FROM pg_replication_slots WHERE slot_name = 'up_slot'; +]); +like($up_slot_state, qr/^reserved\|/, + "user-pause slot survived catalog prune (state: $up_slot_state)"); + +# The crux: recovery is STILL paused because the operator's pause was not +# cleared by the GUC's auto-resume. +my $up_pause_state = $node_standby_up->safe_psql('postgres', + "SELECT pg_get_wal_replay_pause_state()"); +is($up_pause_state, 'paused', + "operator pause survived GUC auto-resume (state: $up_pause_state)"); + +# Now the operator resumes and replay must proceed past the pause. +my $up_lsn_before = $node_standby_up->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); +$node_standby_up->safe_psql('postgres', "SELECT pg_wal_replay_resume()"); +$node_standby_up->poll_query_until('postgres', + "SELECT pg_get_wal_replay_pause_state() = 'not paused'") + or die "replay did not leave paused state after operator resume"; +ok($node_standby_up->poll_query_until('postgres', + "SELECT pg_last_wal_replay_lsn() >= '$up_lsn_before'::pg_lsn"), + "replay proceeds after operator pg_wal_replay_resume()"); + +$node_standby_up->stop; +$node_standby_off->stop; +$node_standby->stop; +$node_primary->stop; + +done_testing();